streaming: refactor interface to be object-database-centric

Refactor the streaming interface to be centered around object databases
instead of centered around the repository. Rename the functions
accordingly.

Signed-off-by: Patrick Steinhardt <ps@pks.im>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
This commit is contained in:
Patrick Steinhardt
2025-11-23 19:59:42 +01:00
committed by Junio C Hamano
parent 8c1b84bc97
commit 378ec56beb
7 changed files with 71 additions and 51 deletions

View File

@@ -135,16 +135,16 @@ static int stream_blocked(struct repository *r, const struct object_id *oid)
char buf[BLOCKSIZE];
ssize_t readlen;
st = open_istream(r, oid, &type, &sz, NULL);
st = odb_read_stream_open(r->objects, oid, &type, &sz, NULL);
if (!st)
return error(_("cannot stream blob %s"), oid_to_hex(oid));
for (;;) {
readlen = read_istream(st, buf, sizeof(buf));
readlen = odb_read_stream_read(st, buf, sizeof(buf));
if (readlen <= 0)
break;
do_write_blocked(buf, readlen);
}
close_istream(st);
odb_read_stream_close(st);
if (!readlen)
finish_record();
return readlen;

View File

@@ -348,8 +348,8 @@ static int write_zip_entry(struct archiver_args *args,
if (!buffer) {
enum object_type type;
stream = open_istream(args->repo, oid, &type, &size,
NULL);
stream = odb_read_stream_open(args->repo->objects, oid,
&type, &size, NULL);
if (!stream)
return error(_("cannot stream blob %s"),
oid_to_hex(oid));
@@ -429,7 +429,7 @@ static int write_zip_entry(struct archiver_args *args,
ssize_t readlen;
for (;;) {
readlen = read_istream(stream, buf, sizeof(buf));
readlen = odb_read_stream_read(stream, buf, sizeof(buf));
if (readlen <= 0)
break;
crc = crc32(crc, buf, readlen);
@@ -439,7 +439,7 @@ static int write_zip_entry(struct archiver_args *args,
buf, readlen);
write_or_die(1, buf, readlen);
}
close_istream(stream);
odb_read_stream_close(stream);
if (readlen)
return readlen;
@@ -462,7 +462,7 @@ static int write_zip_entry(struct archiver_args *args,
zstream.avail_out = sizeof(compressed);
for (;;) {
readlen = read_istream(stream, buf, sizeof(buf));
readlen = odb_read_stream_read(stream, buf, sizeof(buf));
if (readlen <= 0)
break;
crc = crc32(crc, buf, readlen);
@@ -486,7 +486,7 @@ static int write_zip_entry(struct archiver_args *args,
}
}
close_istream(stream);
odb_read_stream_close(stream);
if (readlen)
return readlen;

View File

@@ -779,7 +779,7 @@ static int compare_objects(const unsigned char *buf, unsigned long size,
}
while (size) {
ssize_t len = read_istream(data->st, data->buf, size);
ssize_t len = odb_read_stream_read(data->st, data->buf, size);
if (len == 0)
die(_("SHA1 COLLISION FOUND WITH %s !"),
oid_to_hex(&data->entry->idx.oid));
@@ -807,15 +807,15 @@ static int check_collison(struct object_entry *entry)
memset(&data, 0, sizeof(data));
data.entry = entry;
data.st = open_istream(the_repository, &entry->idx.oid, &type, &size,
NULL);
data.st = odb_read_stream_open(the_repository->objects, &entry->idx.oid,
&type, &size, NULL);
if (!data.st)
return -1;
if (size != entry->size || type != entry->type)
die(_("SHA1 COLLISION FOUND WITH %s !"),
oid_to_hex(&entry->idx.oid));
unpack_data(entry, compare_objects, &data);
close_istream(data.st);
odb_read_stream_close(data.st);
free(data.buf);
return 0;
}

View File

@@ -417,7 +417,7 @@ static unsigned long write_large_blob_data(struct odb_read_stream *st, struct ha
for (;;) {
ssize_t readlen;
int zret = Z_OK;
readlen = read_istream(st, ibuf, sizeof(ibuf));
readlen = odb_read_stream_read(st, ibuf, sizeof(ibuf));
if (readlen == -1)
die(_("unable to read %s"), oid_to_hex(oid));
@@ -520,8 +520,8 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent
if (oe_type(entry) == OBJ_BLOB &&
oe_size_greater_than(&to_pack, entry,
repo_settings_get_big_file_threshold(the_repository)) &&
(st = open_istream(the_repository, &entry->idx.oid, &type,
&size, NULL)) != NULL)
(st = odb_read_stream_open(the_repository->objects, &entry->idx.oid,
&type, &size, NULL)) != NULL)
buf = NULL;
else {
buf = odb_read_object(the_repository->objects,
@@ -577,7 +577,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent
dheader[--pos] = 128 | (--ofs & 127);
if (limit && hdrlen + sizeof(dheader) - pos + datalen + hashsz >= limit) {
if (st)
close_istream(st);
odb_read_stream_close(st);
free(buf);
return 0;
}
@@ -591,7 +591,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent
*/
if (limit && hdrlen + hashsz + datalen + hashsz >= limit) {
if (st)
close_istream(st);
odb_read_stream_close(st);
free(buf);
return 0;
}
@@ -601,7 +601,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent
} else {
if (limit && hdrlen + datalen + hashsz >= limit) {
if (st)
close_istream(st);
odb_read_stream_close(st);
free(buf);
return 0;
}
@@ -609,7 +609,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent
}
if (st) {
datalen = write_large_blob_data(st, f, &entry->idx.oid);
close_istream(st);
odb_read_stream_close(st);
} else {
hashwrite(f, buf, datalen);
free(buf);

View File

@@ -139,7 +139,7 @@ int stream_object_signature(struct repository *r, const struct object_id *oid)
char hdr[MAX_HEADER_LEN];
int hdrlen;
st = open_istream(r, oid, &obj_type, &size, NULL);
st = odb_read_stream_open(r->objects, oid, &obj_type, &size, NULL);
if (!st)
return -1;
@@ -151,10 +151,10 @@ int stream_object_signature(struct repository *r, const struct object_id *oid)
git_hash_update(&c, hdr, hdrlen);
for (;;) {
char buf[1024 * 16];
ssize_t readlen = read_istream(st, buf, sizeof(buf));
ssize_t readlen = odb_read_stream_read(st, buf, sizeof(buf));
if (readlen < 0) {
close_istream(st);
odb_read_stream_close(st);
return -1;
}
if (!readlen)
@@ -162,7 +162,7 @@ int stream_object_signature(struct repository *r, const struct object_id *oid)
git_hash_update(&c, buf, readlen);
}
git_hash_final_oid(&real_oid, &c);
close_istream(st);
odb_read_stream_close(st);
return !oideq(oid, &real_oid) ? -1 : 0;
}

View File

@@ -35,7 +35,7 @@ static int close_istream_filtered(struct odb_read_stream *_fs)
{
struct odb_filtered_read_stream *fs = (struct odb_filtered_read_stream *)_fs;
free_stream_filter(fs->filter);
return close_istream(fs->upstream);
return odb_read_stream_close(fs->upstream);
}
static ssize_t read_istream_filtered(struct odb_read_stream *_fs, char *buf,
@@ -87,7 +87,7 @@ static ssize_t read_istream_filtered(struct odb_read_stream *_fs, char *buf,
/* refill the input from the upstream */
if (!fs->input_finished) {
fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
fs->i_end = odb_read_stream_read(fs->upstream, fs->ibuf, FILTER_BUFFER);
if (fs->i_end < 0)
return -1;
if (fs->i_end)
@@ -149,7 +149,7 @@ static ssize_t read_istream_incore(struct odb_read_stream *_st, char *buf, size_
}
static int open_istream_incore(struct odb_read_stream **out,
struct repository *r,
struct object_database *odb,
const struct object_id *oid)
{
struct object_info oi = OBJECT_INFO_INIT;
@@ -163,7 +163,7 @@ static int open_istream_incore(struct odb_read_stream **out,
oi.typep = &stream.base.type;
oi.sizep = &stream.base.size;
oi.contentp = (void **)&stream.buf;
ret = odb_read_object_info_extended(r->objects, oid, &oi,
ret = odb_read_object_info_extended(odb, oid, &oi,
OBJECT_INFO_DIE_IF_CORRUPT);
if (ret)
return ret;
@@ -180,47 +180,47 @@ static int open_istream_incore(struct odb_read_stream **out,
*****************************************************************************/
static int istream_source(struct odb_read_stream **out,
struct repository *r,
struct object_database *odb,
const struct object_id *oid)
{
struct odb_source *source;
if (!packfile_store_read_object_stream(out, r->objects->packfiles, oid))
if (!packfile_store_read_object_stream(out, odb->packfiles, oid))
return 0;
odb_prepare_alternates(r->objects);
for (source = r->objects->sources; source; source = source->next)
odb_prepare_alternates(odb);
for (source = odb->sources; source; source = source->next)
if (!odb_source_loose_read_object_stream(out, source, oid))
return 0;
return open_istream_incore(out, r, oid);
return open_istream_incore(out, odb, oid);
}
/****************************************************************
* Users of streaming interface
****************************************************************/
int close_istream(struct odb_read_stream *st)
int odb_read_stream_close(struct odb_read_stream *st)
{
int r = st->close(st);
free(st);
return r;
}
ssize_t read_istream(struct odb_read_stream *st, void *buf, size_t sz)
ssize_t odb_read_stream_read(struct odb_read_stream *st, void *buf, size_t sz)
{
return st->read(st, buf, sz);
}
struct odb_read_stream *open_istream(struct repository *r,
const struct object_id *oid,
enum object_type *type,
unsigned long *size,
struct stream_filter *filter)
struct odb_read_stream *odb_read_stream_open(struct object_database *odb,
const struct object_id *oid,
enum object_type *type,
unsigned long *size,
struct stream_filter *filter)
{
struct odb_read_stream *st;
const struct object_id *real = lookup_replace_object(r, oid);
int ret = istream_source(&st, r, real);
const struct object_id *real = lookup_replace_object(odb->repo, oid);
int ret = istream_source(&st, odb, real);
if (ret)
return NULL;
@@ -229,7 +229,7 @@ struct odb_read_stream *open_istream(struct repository *r,
/* Add "&& !is_null_stream_filter(filter)" for performance */
struct odb_read_stream *nst = attach_stream_filter(st, filter);
if (!nst) {
close_istream(st);
odb_read_stream_close(st);
return NULL;
}
st = nst;
@@ -252,7 +252,7 @@ int odb_stream_blob_to_fd(struct object_database *odb,
ssize_t kept = 0;
int result = -1;
st = open_istream(odb->repo, oid, &type, &sz, filter);
st = odb_read_stream_open(odb, oid, &type, &sz, filter);
if (!st) {
if (filter)
free_stream_filter(filter);
@@ -263,7 +263,7 @@ int odb_stream_blob_to_fd(struct object_database *odb,
for (;;) {
char buf[1024 * 16];
ssize_t wrote, holeto;
ssize_t readlen = read_istream(st, buf, sizeof(buf));
ssize_t readlen = odb_read_stream_read(st, buf, sizeof(buf));
if (readlen < 0)
goto close_and_exit;
@@ -294,6 +294,6 @@ int odb_stream_blob_to_fd(struct object_database *odb,
result = 0;
close_and_exit:
close_istream(st);
odb_read_stream_close(st);
return result;
}

View File

@@ -24,11 +24,31 @@ struct odb_read_stream {
unsigned long size; /* inflated size of full object */
};
struct odb_read_stream *open_istream(struct repository *, const struct object_id *,
enum object_type *, unsigned long *,
struct stream_filter *);
int close_istream(struct odb_read_stream *);
ssize_t read_istream(struct odb_read_stream *, void *, size_t);
/*
* Create a new object stream for the given object database. Populates the type
* and size pointers with the object's info. An optional filter can be used to
* transform the object's content.
*
* Returns the stream on success, a `NULL` pointer otherwise.
*/
struct odb_read_stream *odb_read_stream_open(struct object_database *odb,
const struct object_id *oid,
enum object_type *type,
unsigned long *size,
struct stream_filter *filter);
/*
* Close the given read stream and release all resources associated with it.
* Returns 0 on success, a negative error code otherwise.
*/
int odb_read_stream_close(struct odb_read_stream *stream);
/*
* Read data from the stream into the buffer. Returns 0 on EOF and the number
* of bytes read on success. Returns a negative error code in case reading from
* the stream fails.
*/
ssize_t odb_read_stream_read(struct odb_read_stream *stream, void *buf, size_t len);
/*
* Look up the object by its ID and write the full contents to the file