From cb64352c75f0da8979c9addd37b2b78bb97d5301 Mon Sep 17 00:00:00 2001 From: Frank Heckenbach Date: Sun, 14 Apr 2013 15:38:07 -0400 Subject: Initial patch for output synchronization. See Savannah bug #33138. Based on work by David Boyce . --- job.c | 253 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 253 insertions(+) (limited to 'job.c') diff --git a/job.c b/job.c index 2e2d3ff..dedd526 100644 --- a/job.c +++ b/job.c @@ -242,6 +242,16 @@ unsigned long job_counter = 0; /* Number of jobserver tokens this instance is currently using. */ unsigned int jobserver_tokens = 0; + +#ifdef PARALLEL_SYNC +/* Semaphore for use in -j mode with parallel_sync. */ + +int sync_handle = -1; + +#define STREAM_OK(strm) ((fcntl (fileno ((strm)), F_GETFD) != -1) || (errno != EBADF)) + +#define FD_NOT_EMPTY(FD) ((FD) >= 0 && lseek ((FD), 0, SEEK_CUR) > 0) +#endif /* PARALLEL_SYNC */ #ifdef WINDOWS32 /* @@ -539,6 +549,184 @@ child_handler (int sig UNUSED) */ } +#ifdef PARALLEL_SYNC +/* Adds file descriptors to the child structure to support parallel_sync; + one for stdout and one for stderr as long as they are (a) open and + (b) separate. If stdout and stderr share a device they can share a + temp file too. */ +static int +assign_child_tempfiles (struct child *c, int combined) +{ + FILE *outstrm = NULL, *errstrm = NULL; + int o_ok, e_ok; + const char *suppressed = "parallel-sync suppressed: "; + char *failmode = NULL; + + /* Check status of stdout and stderr before hooking up temp files. */ + o_ok = STREAM_OK (stdout); + e_ok = STREAM_OK (stderr); + + /* The tmpfile() function returns a FILE pointer but those can be in + limited supply, so we immediately dup its file descriptor and keep + only that, closing the FILE pointer. */ + + if (combined) + { + if (!(outstrm = tmpfile ())) + failmode = "tmpfile()"; + else + errstrm = outstrm; + } + else if (o_ok && e_ok) + { + if (!(outstrm = tmpfile ()) || !(errstrm = tmpfile ())) + failmode = "tmpfile()"; + } + else if (o_ok) + { + if (!(outstrm = tmpfile ())) + failmode = "tmpfile()"; + } + else if (e_ok) + { + if (!(errstrm = tmpfile ())) + failmode = "tmpfile()"; + } + else + failmode = "stdout"; + + if (!failmode && outstrm) + { + if ((c->outfd = dup (fileno (outstrm))) == -1) + failmode = "dup2()"; + else + CLOSE_ON_EXEC (c->outfd); + } + + if (!failmode && errstrm) + { + if (combined) + c->errfd = c->outfd; + else + { + if ((c->errfd = dup (fileno (errstrm))) == -1) + failmode = "dup2()"; + else + CLOSE_ON_EXEC (c->errfd); + } + } + + if (failmode) + perror_with_name (suppressed, failmode); + + if (outstrm) + (void) fclose (outstrm); + + if (errstrm && errstrm != outstrm) + (void) fclose (errstrm); + + return failmode == NULL; +} + +/* Support routine for sync_output() */ +static void +pump_from_tmp_fd (int from_fd, int to_fd) +{ + ssize_t nleft, nwrite; + char buffer[8192]; + + if (lseek (from_fd, 0, SEEK_SET) == -1) + perror ("lseek()"); + + while (1) + { + char *write_buf = buffer; + EINTRLOOP (nleft, read (from_fd, buffer, sizeof (buffer))); + if (nleft < 0) + perror ("read()"); + if (nleft <= 0) + break; + while (nleft > 0) + { + EINTRLOOP (nwrite, write (to_fd, write_buf, nleft)); + if (nwrite < 0) + { + perror ("write()"); + return; + } + + write_buf += nwrite; + nleft -= nwrite; + } + } +} + +/* Support routine for sync_output() */ +static void * +acquire_semaphore (void) +{ + static struct flock fl; + + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = 0; /* lock just one byte according to pid */ + fl.l_len = 1; + if (fcntl (sync_handle, F_SETLKW, &fl) != -1) + return &fl; + perror ("fcntl()"); + return NULL; +} + +/* Support routine for sync_output() */ +static void +release_semaphore (void *sem) +{ + struct flock *flp = (struct flock *)sem; + flp->l_type = F_UNLCK; + if (fcntl (sync_handle, F_SETLKW, flp) == -1) + perror ("fcntl()"); +} + +/* Synchronize the output of jobs in -j mode to keep the results of + each job together. This is done by holding the results in temp files, + one for stdout and potentially another for stderr, and only releasing + them to "real" stdout/stderr when a semaphore can be obtained. */ + +static void +sync_output (struct child *c) +{ + void *sem; + + int outfd_not_empty = FD_NOT_EMPTY (c->outfd); + int errfd_not_empty = FD_NOT_EMPTY (c->errfd); + + if ((outfd_not_empty || errfd_not_empty) && (sem = acquire_semaphore ())) + { + /* + * We've entered the "critical section" during which a lock is held. + * We want to keep it as short as possible. + */ + if (outfd_not_empty) + { + log_working_directory (1, 1); + pump_from_tmp_fd (c->outfd, fileno (stdout)); + log_working_directory (0, 1); + } + if (errfd_not_empty && c->errfd != c->outfd) + pump_from_tmp_fd (c->errfd, fileno (stderr)); + + /* Exit the critical section */ + release_semaphore (sem); + } + + if (c->outfd >= 0) + close (c->outfd); + if (c->errfd >= 0) + close (c->errfd); + c->outfd = c->errfd = -1; +} +#endif /* PARALLEL_SYNC */ + extern int shell_function_pid, shell_function_completed; /* Reap all dead children, storing the returned status and the new command @@ -833,6 +1021,12 @@ reap_children (int block, int err) c->sh_batch_file = NULL; } +#ifdef PARALLEL_SYNC + /* Synchronize parallel output if requested */ + if (parallel_sync) + sync_output (c); +#endif /* PARALLEL_SYNC */ + /* If this child had the good stdin, say it is now free. */ if (c->good_stdin) good_stdin_used = 0; @@ -1413,6 +1607,45 @@ start_job_command (struct child *child) #else /* !__EMX__ */ +#ifdef PARALLEL_SYNC + if (parallel_sync) + { + static int combined_output; + /* If parallel_sync is turned on, find a resource to + synchronize on. This block is traversed only once. */ + if (sync_handle == -1) + { + struct stat stbuf_o, stbuf_e; + + if (STREAM_OK (stdout)) + { + sync_handle = fileno (stdout); + combined_output = + fstat (fileno (stdout), &stbuf_o) == 0 && + fstat (fileno (stderr), &stbuf_e) == 0 && + stbuf_o.st_dev == stbuf_e.st_dev && + stbuf_o.st_ino == stbuf_e.st_ino; + } + else if (STREAM_OK (stderr)) + sync_handle = fileno (stderr); + else + { + perror_with_name ("parallel-sync suppressed: ", "stderr"); + parallel_sync = 0; + } + } + + /* If it still looks like we can synchronize, create a temp + file to hold stdout (and one for stderr if separate). */ + if (parallel_sync >= PARALLEL_SYNC_COARSE + || (parallel_sync == PARALLEL_SYNC_FINE && !(flags & COMMANDS_RECURSE))) + { + if (!assign_child_tempfiles (child, combined_output)) + parallel_sync = 0; + } + } +#endif /* PARALLEL_SYNC */ + child->pid = vfork (); environ = parent_environ; /* Restore value child may have clobbered. */ if (child->pid == 0) @@ -1436,6 +1669,23 @@ start_job_command (struct child *child) setrlimit (RLIMIT_STACK, &stack_limit); #endif +#ifdef PARALLEL_SYNC + /* Divert child output into tempfile(s) if parallel_sync in use. */ + if (parallel_sync) + { + int outfd = fileno (stdout); + int errfd = fileno (stderr); + + if ((child->outfd >= 0 && + (close (outfd) == -1 || dup2 (child->outfd, outfd) == -1)) + || (child->errfd >= 0 && + (close (errfd) == -1 || dup2 (child->errfd, errfd) == -1))) + { + perror_with_name ("parallel-sync: ", "dup2()"); + } + } +#endif /* PARALLEL_SYNC */ + child_execute_job (child->good_stdin ? 0 : bad_stdin, 1, argv, child->environment); } @@ -1769,6 +2019,9 @@ new_job (struct file *file) c->file = file; c->command_lines = lines; c->sh_batch_file = NULL; +#ifdef PARALLEL_SYNC + c->outfd = c->errfd = -1; +#endif /* Cache dontcare flag because file->dontcare can be changed once we return. Check dontcare inheritance mechanism for details. */ -- cgit v1.2.3