Merge branch 'ps/upload-pack-buffer-more-writes' into jch

Reduce system overhead "git upload-pack" spends relaying "git
pack-objects" output to the "git fetch" running on the other end of
the connection.

* ps/upload-pack-buffer-more-writes:
  builtin/pack-objects: reduce lock contention when writing packfile data
  csum-file: drop `hashfd_throughput()`
  csum-file: introduce `hashfd_ext()`
  sideband: use writev(3p) to send pktlines
  wrapper: introduce writev(3p) wrappers
  compat/posix: introduce writev(3p) wrapper
  git-compat-util: introduce `cast_size_t_to_ssize_t()`
  upload-pack: reduce lock contention when writing packfile data
  upload-pack: adapt keepalives based on buffering
  upload-pack: fix debug statement when flushing packfile data
This commit is contained in:
Junio C Hamano
2026-03-03 11:08:35 -08:00
15 changed files with 217 additions and 41 deletions

View File

@@ -2026,6 +2026,10 @@ ifdef NO_PREAD
COMPAT_CFLAGS += -DNO_PREAD
COMPAT_OBJS += compat/pread.o
endif
ifdef NO_WRITEV
COMPAT_CFLAGS += -DNO_WRITEV
COMPAT_OBJS += compat/writev.o
endif
ifdef NO_FAST_WORKING_DIRECTORY
BASIC_CFLAGS += -DNO_FAST_WORKING_DIRECTORY
endif

View File

@@ -41,6 +41,7 @@
#include "promisor-remote.h"
#include "pack-mtimes.h"
#include "parse-options.h"
#include "pkt-line.h"
#include "blob.h"
#include "tree.h"
#include "path-walk.h"
@@ -1330,11 +1331,25 @@ static void write_pack_file(void)
unsigned char hash[GIT_MAX_RAWSZ];
char *pack_tmp_name = NULL;
if (pack_to_stdout)
f = hashfd_throughput(the_repository->hash_algo, 1,
"<stdout>", progress_state);
else
if (pack_to_stdout) {
/*
* This command is most often invoked via
* git-upload-pack(1), which will typically chunk data
* into pktlines. As such, we use the maximum data
* length of them as buffer length.
*
* Note that we need to subtract one though to
* accomodate for the sideband byte.
*/
struct hashfd_options opts = {
.progress = progress_state,
.buffer_len = LARGE_PACKET_DATA_MAX - 1,
};
f = hashfd_ext(the_repository->hash_algo, 1,
"<stdout>", &opts);
} else {
f = create_tmp_packfile(the_repository, &pack_tmp_name);
}
offset = write_pack_header(f, nr_remaining);

View File

@@ -137,6 +137,9 @@
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/statvfs.h>
#ifndef NO_WRITEV
#include <sys/uio.h>
#endif
#include <termios.h>
#ifndef NO_SYS_SELECT_H
#include <sys/select.h>
@@ -323,6 +326,17 @@ int git_lstat(const char *, struct stat *);
ssize_t git_pread(int fd, void *buf, size_t count, off_t offset);
#endif
#ifdef NO_WRITEV
#define writev git_writev
#define iovec git_iovec
struct git_iovec {
void *iov_base;
size_t iov_len;
};
ssize_t git_writev(int fd, const struct iovec *iov, int iovcnt);
#endif
#ifdef NO_SETENV
#define setenv gitsetenv
int gitsetenv(const char *, const char *, int);

29
compat/writev.c Normal file
View File

@@ -0,0 +1,29 @@
#include "../git-compat-util.h"
#include "../wrapper.h"
ssize_t git_writev(int fd, const struct iovec *iov, int iovcnt)
{
size_t total_written = 0;
for (int i = 0; i < iovcnt; i++) {
const char *bytes = iov[i].iov_base;
size_t iovec_written = 0;
while (iovec_written < iov[i].iov_len) {
ssize_t bytes_written = xwrite(fd, bytes + iovec_written,
iov[i].iov_len - iovec_written);
if (bytes_written < 0) {
if (total_written)
goto out;
return bytes_written;
}
if (!bytes_written)
goto out;
iovec_written += bytes_written;
total_written += bytes_written;
}
}
out:
return cast_size_t_to_ssize_t(total_written);
}

View File

@@ -459,6 +459,7 @@ ifeq ($(uname_S),Windows)
SANE_TOOL_PATH ?= $(msvc_bin_dir_msys)
HAVE_ALLOCA_H = YesPlease
NO_PREAD = YesPlease
NO_WRITEV = YesPlease
NEEDS_CRYPTO_WITH_SSL = YesPlease
NO_LIBGEN_H = YesPlease
NO_POLL = YesPlease
@@ -674,6 +675,7 @@ ifeq ($(uname_S),MINGW)
pathsep = ;
HAVE_ALLOCA_H = YesPlease
NO_PREAD = YesPlease
NO_WRITEV = YesPlease
NEEDS_CRYPTO_WITH_SSL = YesPlease
NO_LIBGEN_H = YesPlease
NO_POLL = YesPlease

View File

@@ -161,17 +161,16 @@ struct hashfile *hashfd_check(const struct git_hash_algo *algop,
return f;
}
static struct hashfile *hashfd_internal(const struct git_hash_algo *algop,
int fd, const char *name,
struct progress *tp,
size_t buffer_len)
struct hashfile *hashfd_ext(const struct git_hash_algo *algop,
int fd, const char *name,
const struct hashfd_options *opts)
{
struct hashfile *f = xmalloc(sizeof(*f));
f->fd = fd;
f->check_fd = -1;
f->offset = 0;
f->total = 0;
f->tp = tp;
f->tp = opts->progress;
f->name = name;
f->do_crc = 0;
f->skip_hash = 0;
@@ -179,8 +178,8 @@ static struct hashfile *hashfd_internal(const struct git_hash_algo *algop,
f->algop = unsafe_hash_algo(algop);
f->algop->init_fn(&f->ctx);
f->buffer_len = buffer_len;
f->buffer = xmalloc(buffer_len);
f->buffer_len = opts->buffer_len ? opts->buffer_len : 128 * 1024;
f->buffer = xmalloc(f->buffer_len);
f->check_buffer = NULL;
return f;
@@ -194,19 +193,8 @@ struct hashfile *hashfd(const struct git_hash_algo *algop,
* measure the rate of data passing through this hashfile,
* use a larger buffer size to reduce fsync() calls.
*/
return hashfd_internal(algop, fd, name, NULL, 128 * 1024);
}
struct hashfile *hashfd_throughput(const struct git_hash_algo *algop,
int fd, const char *name, struct progress *tp)
{
/*
* Since we are expecting to report progress of the
* write into this hashfile, use a smaller buffer
* size so the progress indicators arrive at a more
* frequent rate.
*/
return hashfd_internal(algop, fd, name, tp, 8 * 1024);
struct hashfd_options opts = { 0 };
return hashfd_ext(algop, fd, name, &opts);
}
void hashfile_checkpoint_init(struct hashfile *f,

View File

@@ -45,12 +45,24 @@ int hashfile_truncate(struct hashfile *, struct hashfile_checkpoint *);
#define CSUM_FSYNC 2
#define CSUM_HASH_IN_STREAM 4
struct hashfd_options {
/*
* Throughput progress that counts the number of bytes that have been
* hashed.
*/
struct progress *progress;
/* The length of the buffer that shall be used read read data. */
size_t buffer_len;
};
struct hashfile *hashfd_ext(const struct git_hash_algo *algop,
int fd, const char *name,
const struct hashfd_options *opts);
struct hashfile *hashfd(const struct git_hash_algo *algop,
int fd, const char *name);
struct hashfile *hashfd_check(const struct git_hash_algo *algop,
const char *name);
struct hashfile *hashfd_throughput(const struct git_hash_algo *algop,
int fd, const char *name, struct progress *tp);
/*
* Free the hashfile without flushing its contents to disk. This only

View File

@@ -665,6 +665,14 @@ static inline int cast_size_t_to_int(size_t a)
return (int)a;
}
static inline ssize_t cast_size_t_to_ssize_t(size_t a)
{
if (a > maximum_signed_value_of_type(ssize_t))
die("number too large to represent as ssize_t on this platform: %"PRIuMAX,
(uintmax_t)a);
return (ssize_t)a;
}
static inline uint64_t u64_mult(uint64_t a, uint64_t b)
{
if (unsigned_mult_overflows(a, b))

View File

@@ -1416,6 +1416,7 @@ checkfuncs = {
'initgroups' : [],
'strtoumax' : ['strtoumax.c', 'strtoimax.c'],
'pread' : ['pread.c'],
'writev' : ['writev.c'],
}
if host_machine.system() == 'windows'

View File

@@ -443,6 +443,7 @@ void send_sideband(int fd, int band, const char *data, ssize_t sz, int packet_ma
const char *p = data;
while (sz) {
struct iovec iov[2];
unsigned n;
char hdr[5];
@@ -452,12 +453,19 @@ void send_sideband(int fd, int band, const char *data, ssize_t sz, int packet_ma
if (0 <= band) {
xsnprintf(hdr, sizeof(hdr), "%04x", n + 5);
hdr[4] = band;
write_or_die(fd, hdr, 5);
iov[0].iov_base = hdr;
iov[0].iov_len = 5;
} else {
xsnprintf(hdr, sizeof(hdr), "%04x", n + 4);
write_or_die(fd, hdr, 4);
iov[0].iov_base = hdr;
iov[0].iov_len = 4;
}
write_or_die(fd, p, n);
iov[1].iov_base = (void *) p;
iov[1].iov_len = n;
writev_or_die(fd, iov, ARRAY_SIZE(iov));
p += n;
sz -= n;
}

View File

@@ -29,6 +29,7 @@
#include "commit-graph.h"
#include "commit-reach.h"
#include "shallow.h"
#include "trace.h"
#include "write-or-die.h"
#include "json-writer.h"
#include "strmap.h"
@@ -218,7 +219,8 @@ struct output_state {
};
static int relay_pack_data(int pack_objects_out, struct output_state *os,
int use_sideband, int write_packfile_line)
int use_sideband, int write_packfile_line,
bool *did_send_data)
{
/*
* We keep the last byte to ourselves
@@ -232,6 +234,8 @@ static int relay_pack_data(int pack_objects_out, struct output_state *os,
*/
ssize_t readsz;
*did_send_data = false;
readsz = xread(pack_objects_out, os->buffer + os->used,
sizeof(os->buffer) - os->used);
if (readsz < 0) {
@@ -247,6 +251,7 @@ static int relay_pack_data(int pack_objects_out, struct output_state *os,
if (os->packfile_uris_started)
packet_delim(1);
packet_write_fmt(1, "\1packfile\n");
*did_send_data = true;
}
break;
}
@@ -259,6 +264,7 @@ static int relay_pack_data(int pack_objects_out, struct output_state *os,
}
*p = '\0';
packet_write_fmt(1, "\1%s\n", os->buffer);
*did_send_data = true;
os->used -= p - os->buffer + 1;
memmove(os->buffer, p + 1, os->used);
@@ -270,6 +276,13 @@ static int relay_pack_data(int pack_objects_out, struct output_state *os,
}
}
/*
* Make sure that we buffer some data before sending it to the client.
* This significantly reduces the number of write(3p) syscalls.
*/
if (readsz && os->used < (sizeof(os->buffer) * 2 / 3))
return readsz;
if (os->used > 1) {
send_client_data(1, os->buffer, os->used - 1, use_sideband);
os->buffer[0] = os->buffer[os->used - 1];
@@ -279,6 +292,7 @@ static int relay_pack_data(int pack_objects_out, struct output_state *os,
os->used = 0;
}
*did_send_data = true;
return readsz;
}
@@ -290,6 +304,7 @@ static void create_pack_file(struct upload_pack_data *pack_data,
char progress[128];
char abort_msg[] = "aborting due to possible repository "
"corruption on the remote side.";
uint64_t last_sent_ms = 0;
ssize_t sz;
int i;
FILE *pipe_fd;
@@ -365,10 +380,14 @@ static void create_pack_file(struct upload_pack_data *pack_data,
*/
while (1) {
uint64_t now_ms = getnanotime() / 1000000;
struct pollfd pfd[2];
int pe, pu, pollsize, polltimeout;
int pe, pu, pollsize, polltimeout_ms;
int ret;
if (!last_sent_ms)
last_sent_ms = now_ms;
reset_timeout(pack_data->timeout);
pollsize = 0;
@@ -390,11 +409,21 @@ static void create_pack_file(struct upload_pack_data *pack_data,
if (!pollsize)
break;
polltimeout = pack_data->keepalive < 0
? -1
: 1000 * pack_data->keepalive;
if (pack_data->keepalive < 0) {
polltimeout_ms = -1;
} else {
/*
* The polling timeout needs to be adjusted based on
* the time we have sent our last package. The longer
* it's been in the past, the shorter the timeout
* becomes until we eventually don't block at all.
*/
polltimeout_ms = 1000 * pack_data->keepalive - (now_ms - last_sent_ms);
if (polltimeout_ms < 0)
polltimeout_ms = 0;
}
ret = poll(pfd, pollsize, polltimeout);
ret = poll(pfd, pollsize, polltimeout_ms);
if (ret < 0) {
if (errno != EINTR) {
@@ -403,16 +432,18 @@ static void create_pack_file(struct upload_pack_data *pack_data,
}
continue;
}
if (0 <= pe && (pfd[pe].revents & (POLLIN|POLLHUP))) {
/* Status ready; we ship that in the side-band
* or dump to the standard error.
*/
sz = xread(pack_objects.err, progress,
sizeof(progress));
if (0 < sz)
if (0 < sz) {
send_client_data(2, progress, sz,
pack_data->use_sideband);
else if (sz == 0) {
last_sent_ms = now_ms;
} else if (sz == 0) {
close(pack_objects.err);
pack_objects.err = -1;
}
@@ -421,11 +452,14 @@ static void create_pack_file(struct upload_pack_data *pack_data,
/* give priority to status messages */
continue;
}
if (0 <= pu && (pfd[pu].revents & (POLLIN|POLLHUP))) {
bool did_send_data;
int result = relay_pack_data(pack_objects.out,
output_state,
pack_data->use_sideband,
!!uri_protocols);
!!uri_protocols,
&did_send_data);
if (result == 0) {
close(pack_objects.out);
@@ -433,6 +467,9 @@ static void create_pack_file(struct upload_pack_data *pack_data,
} else if (result < 0) {
goto fail;
}
if (did_send_data)
last_sent_ms = now_ms;
}
/*
@@ -448,6 +485,7 @@ static void create_pack_file(struct upload_pack_data *pack_data,
if (!ret && pack_data->use_sideband) {
static const char buf[] = "0005\1";
write_or_die(1, buf, 5);
last_sent_ms = now_ms;
}
}
@@ -457,11 +495,9 @@ static void create_pack_file(struct upload_pack_data *pack_data,
}
/* flush the data */
if (output_state->used > 0) {
if (output_state->used > 0)
send_client_data(1, output_state->buffer, output_state->used,
pack_data->use_sideband);
fprintf(stderr, "flushed.\n");
}
free(output_state);
if (pack_data->use_sideband)
packet_flush(1);

View File

@@ -323,6 +323,47 @@ ssize_t write_in_full(int fd, const void *buf, size_t count)
return total;
}
ssize_t writev_in_full(int fd, struct iovec *iov, int iovcnt)
{
ssize_t total_written = 0;
while (iovcnt) {
ssize_t bytes_written = writev(fd, iov, iovcnt);
if (bytes_written < 0) {
if (errno == EINTR || errno == EAGAIN)
continue;
return -1;
}
if (!bytes_written) {
errno = ENOSPC;
return -1;
}
total_written += bytes_written;
/*
* We first need to discard any iovec entities that have been
* fully written.
*/
while (iovcnt && (size_t)bytes_written >= iov->iov_len) {
bytes_written -= iov->iov_len;
iov++;
iovcnt--;
}
/*
* Finally, we need to adjust the last iovec in case we have
* performed a partial write.
*/
if (iovcnt && bytes_written) {
iov->iov_base = (char *) iov->iov_base + bytes_written;
iov->iov_len -= bytes_written;
}
}
return total_written;
}
ssize_t pread_in_full(int fd, void *buf, size_t count, off_t offset)
{
char *p = buf;

View File

@@ -47,6 +47,15 @@ ssize_t read_in_full(int fd, void *buf, size_t count);
ssize_t write_in_full(int fd, const void *buf, size_t count);
ssize_t pread_in_full(int fd, void *buf, size_t count, off_t offset);
/*
* Try to write all iovecs. Returns -1 in case an error occurred with a proper
* errno set, the number of bytes written otherwise.
*
* Note that the iovec will be modified as a result of this call to adjust for
* partial writes!
*/
ssize_t writev_in_full(int fd, struct iovec *iov, int iovcnt);
static inline ssize_t write_str_in_full(int fd, const char *str)
{
return write_in_full(fd, str, strlen(str));

View File

@@ -96,6 +96,14 @@ void write_or_die(int fd, const void *buf, size_t count)
}
}
void writev_or_die(int fd, struct iovec *iov, int iovlen)
{
if (writev_in_full(fd, iov, iovlen) < 0) {
check_pipe(errno);
die_errno("writev error");
}
}
void fwrite_or_die(FILE *f, const void *buf, size_t count)
{
if (fwrite(buf, 1, count, f) != count)

View File

@@ -7,6 +7,7 @@ void fprintf_or_die(FILE *, const char *fmt, ...);
void fwrite_or_die(FILE *f, const void *buf, size_t count);
void fflush_or_die(FILE *f);
void write_or_die(int fd, const void *buf, size_t count);
void writev_or_die(int fd, struct iovec *iov, int iovlen);
/*
* These values are used to help identify parts of a repository to fsync.