streaming: allocate stream inside the backend-specific logic

When creating a new stream we first allocate it and then call into
backend-specific logic to populate the stream. This design requires that
the stream itself contains a `union` with backend-specific members that
then ultimately get populated by the backend-specific logic.

This works, but it's awkward in the context of pluggable object
databases. Each backend will need its own member in that union, and as
the structure itself is completely opaque (it's only defined in
"streaming.c") it also has the consequence that we must have the logic
that is specific to backends in "streaming.c".

Ideally though, the infrastructure would be reversed: we have a generic
`struct odb_read_stream` and some helper functions in "streaming.c",
whereas the backend-specific logic sits in the backend's subsystem
itself.

This can be realized by using a design that is similar to how we handle
reference databases: instead of having a union of members, we instead
have backend-specific structures with a `struct odb_read_stream base`
as its first member. The backends would thus hand out the pointer to the
base, but internally they know to cast back to the backend-specific
type.

This means though that we need to allocate different structures
depending on the backend. To prepare for this, move allocation of the
structure into the backend-specific functions that open a new stream.
Subsequent commits will then create those new backend-specific structs.

Signed-off-by: Patrick Steinhardt <ps@pks.im>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
This commit is contained in:
Patrick Steinhardt
2025-11-23 19:59:30 +01:00
committed by Junio C Hamano
parent 3c7722dd4d
commit 595296e124

View File

@@ -222,27 +222,34 @@ static int close_istream_loose(struct odb_read_stream *st)
return 0;
}
static int open_istream_loose(struct odb_read_stream *st, struct repository *r,
static int open_istream_loose(struct odb_read_stream **out,
struct repository *r,
const struct object_id *oid)
{
struct object_info oi = OBJECT_INFO_INIT;
struct odb_read_stream *st;
struct odb_source *source;
oi.sizep = &st->size;
oi.typep = &st->type;
unsigned long mapsize;
void *mapped;
odb_prepare_alternates(r->objects);
for (source = r->objects->sources; source; source = source->next) {
st->u.loose.mapped = odb_source_loose_map_object(source, oid,
&st->u.loose.mapsize);
if (st->u.loose.mapped)
mapped = odb_source_loose_map_object(source, oid, &mapsize);
if (mapped)
break;
}
if (!st->u.loose.mapped)
if (!mapped)
return -1;
switch (unpack_loose_header(&st->z, st->u.loose.mapped,
st->u.loose.mapsize, st->u.loose.hdr,
/*
* Note: we must allocate this structure early even though we may still
* fail. This is because we need to initialize the zlib stream, and it
* is not possible to copy the stream around after the fact because it
* has self-referencing pointers.
*/
CALLOC_ARRAY(st, 1);
switch (unpack_loose_header(&st->z, mapped, mapsize, st->u.loose.hdr,
sizeof(st->u.loose.hdr))) {
case ULHR_OK:
break;
@@ -250,19 +257,28 @@ static int open_istream_loose(struct odb_read_stream *st, struct repository *r,
case ULHR_TOO_LONG:
goto error;
}
oi.sizep = &st->size;
oi.typep = &st->type;
if (parse_loose_header(st->u.loose.hdr, &oi) < 0 || st->type < 0)
goto error;
st->u.loose.mapped = mapped;
st->u.loose.mapsize = mapsize;
st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
st->u.loose.hdr_avail = st->z.total_out;
st->z_state = z_used;
st->close = close_istream_loose;
st->read = read_istream_loose;
*out = st;
return 0;
error:
git_inflate_end(&st->z);
munmap(st->u.loose.mapped, st->u.loose.mapsize);
free(st);
return -1;
}
@@ -338,12 +354,16 @@ static int close_istream_pack_non_delta(struct odb_read_stream *st)
return 0;
}
static int open_istream_pack_non_delta(struct odb_read_stream *st,
static int open_istream_pack_non_delta(struct odb_read_stream **out,
struct repository *r UNUSED,
const struct object_id *oid UNUSED,
struct packed_git *pack,
off_t offset)
{
struct odb_read_stream stream = {
.close = close_istream_pack_non_delta,
.read = read_istream_pack_non_delta,
};
struct pack_window *window;
enum object_type in_pack_type;
@@ -352,7 +372,7 @@ static int open_istream_pack_non_delta(struct odb_read_stream *st,
in_pack_type = unpack_object_header(pack,
&window,
&offset,
&st->size);
&stream.size);
unuse_pack(&window);
switch (in_pack_type) {
default:
@@ -363,12 +383,13 @@ static int open_istream_pack_non_delta(struct odb_read_stream *st,
case OBJ_TAG:
break;
}
st->type = in_pack_type;
st->z_state = z_unused;
st->close = close_istream_pack_non_delta;
st->read = read_istream_pack_non_delta;
st->u.in_pack.pack = pack;
st->u.in_pack.pos = offset;
stream.type = in_pack_type;
stream.z_state = z_unused;
stream.u.in_pack.pack = pack;
stream.u.in_pack.pos = offset;
CALLOC_ARRAY(*out, 1);
**out = stream;
return 0;
}
@@ -400,27 +421,35 @@ static ssize_t read_istream_incore(struct odb_read_stream *st, char *buf, size_t
return read_size;
}
static int open_istream_incore(struct odb_read_stream *st, struct repository *r,
static int open_istream_incore(struct odb_read_stream **out,
struct repository *r,
const struct object_id *oid)
{
struct object_info oi = OBJECT_INFO_INIT;
struct odb_read_stream stream = {
.close = close_istream_incore,
.read = read_istream_incore,
};
int ret;
st->u.incore.read_ptr = 0;
st->close = close_istream_incore;
st->read = read_istream_incore;
oi.typep = &stream.type;
oi.sizep = &stream.size;
oi.contentp = (void **)&stream.u.incore.buf;
ret = odb_read_object_info_extended(r->objects, oid, &oi,
OBJECT_INFO_DIE_IF_CORRUPT);
if (ret)
return ret;
oi.typep = &st->type;
oi.sizep = &st->size;
oi.contentp = (void **)&st->u.incore.buf;
return odb_read_object_info_extended(r->objects, oid, &oi,
OBJECT_INFO_DIE_IF_CORRUPT);
CALLOC_ARRAY(*out, 1);
**out = stream;
return 0;
}
/*****************************************************************************
* static helpers variables and functions for users of streaming interface
*****************************************************************************/
static int istream_source(struct odb_read_stream *st,
static int istream_source(struct odb_read_stream **out,
struct repository *r,
const struct object_id *oid)
{
@@ -435,13 +464,13 @@ static int istream_source(struct odb_read_stream *st,
switch (oi.whence) {
case OI_LOOSE:
if (open_istream_loose(st, r, oid) < 0)
if (open_istream_loose(out, r, oid) < 0)
break;
return 0;
case OI_PACKED:
if (oi.u.packed.is_delta ||
repo_settings_get_big_file_threshold(the_repository) >= size ||
open_istream_pack_non_delta(st, r, oid, oi.u.packed.pack,
open_istream_pack_non_delta(out, r, oid, oi.u.packed.pack,
oi.u.packed.offset) < 0)
break;
return 0;
@@ -449,7 +478,7 @@ static int istream_source(struct odb_read_stream *st,
break;
}
return open_istream_incore(st, r, oid);
return open_istream_incore(out, r, oid);
}
/****************************************************************
@@ -474,14 +503,12 @@ struct odb_read_stream *open_istream(struct repository *r,
unsigned long *size,
struct stream_filter *filter)
{
struct odb_read_stream *st = xmalloc(sizeof(*st));
struct odb_read_stream *st;
const struct object_id *real = lookup_replace_object(r, oid);
int ret = istream_source(st, r, real);
int ret = istream_source(&st, r, real);
if (ret) {
free(st);
if (ret)
return NULL;
}
if (filter) {
/* Add "&& !is_null_stream_filter(filter)" for performance */