Merge branch 'hx/unpack-streaming'

Allow large objects read from a packstream to be streamed into a
loose object file straight, without having to keep it in-core as a
whole.

* hx/unpack-streaming:
  unpack-objects: use stream_loose_object() to unpack large objects
  core doc: modernize core.bigFileThreshold documentation
  object-file.c: add "stream_loose_object()" to handle large object
  object-file.c: factor out deflate part of write_loose_object()
  object-file.c: refactor write_loose_object() to several steps
  unpack-objects: low memory footprint for get_data() in dry_run mode
This commit is contained in:
Junio C Hamano
2022-07-14 15:03:59 -07:00
5 changed files with 405 additions and 51 deletions

View File

@ -1951,6 +1951,96 @@ static int create_tmpfile(struct strbuf *tmp, const char *filename)
return fd;
}
/**
* Common steps for loose object writers to start writing loose
* objects:
*
* - Create tmpfile for the loose object.
* - Setup zlib stream for compression.
* - Start to feed header to zlib stream.
*
* Returns a "fd", which should later be provided to
* end_loose_object_common().
*/
static int start_loose_object_common(struct strbuf *tmp_file,
const char *filename, unsigned flags,
git_zstream *stream,
unsigned char *buf, size_t buflen,
git_hash_ctx *c,
char *hdr, int hdrlen)
{
int fd;
fd = create_tmpfile(tmp_file, filename);
if (fd < 0) {
if (flags & HASH_SILENT)
return -1;
else if (errno == EACCES)
return error(_("insufficient permission for adding "
"an object to repository database %s"),
get_object_directory());
else
return error_errno(
_("unable to create temporary file"));
}
/* Setup zlib stream for compression */
git_deflate_init(stream, zlib_compression_level);
stream->next_out = buf;
stream->avail_out = buflen;
the_hash_algo->init_fn(c);
/* Start to feed header to zlib stream */
stream->next_in = (unsigned char *)hdr;
stream->avail_in = hdrlen;
while (git_deflate(stream, 0) == Z_OK)
; /* nothing */
the_hash_algo->update_fn(c, hdr, hdrlen);
return fd;
}
/**
* Common steps for the inner git_deflate() loop for writing loose
* objects. Returns what git_deflate() returns.
*/
static int write_loose_object_common(git_hash_ctx *c,
git_zstream *stream, const int flush,
unsigned char *in0, const int fd,
unsigned char *compressed,
const size_t compressed_len)
{
int ret;
ret = git_deflate(stream, flush ? Z_FINISH : 0);
the_hash_algo->update_fn(c, in0, stream->next_in - in0);
if (write_buffer(fd, compressed, stream->next_out - compressed) < 0)
die(_("unable to write loose object file"));
stream->next_out = compressed;
stream->avail_out = compressed_len;
return ret;
}
/**
* Common steps for loose object writers to end writing loose objects:
*
* - End the compression of zlib stream.
* - Get the calculated oid to "oid".
*/
static int end_loose_object_common(git_hash_ctx *c, git_zstream *stream,
struct object_id *oid)
{
int ret;
ret = git_deflate_end_gently(stream);
if (ret != Z_OK)
return ret;
the_hash_algo->final_oid_fn(oid, c);
return Z_OK;
}
static int write_loose_object(const struct object_id *oid, char *hdr,
int hdrlen, const void *buf, unsigned long len,
time_t mtime, unsigned flags)
@ -1968,50 +2058,29 @@ static int write_loose_object(const struct object_id *oid, char *hdr,
loose_object_path(the_repository, &filename, oid);
fd = create_tmpfile(&tmp_file, filename.buf);
if (fd < 0) {
if (flags & HASH_SILENT)
return -1;
else if (errno == EACCES)
return error(_("insufficient permission for adding an object to repository database %s"), get_object_directory());
else
return error_errno(_("unable to create temporary file"));
}
/* Set it up */
git_deflate_init(&stream, zlib_compression_level);
stream.next_out = compressed;
stream.avail_out = sizeof(compressed);
the_hash_algo->init_fn(&c);
/* First header.. */
stream.next_in = (unsigned char *)hdr;
stream.avail_in = hdrlen;
while (git_deflate(&stream, 0) == Z_OK)
; /* nothing */
the_hash_algo->update_fn(&c, hdr, hdrlen);
fd = start_loose_object_common(&tmp_file, filename.buf, flags,
&stream, compressed, sizeof(compressed),
&c, hdr, hdrlen);
if (fd < 0)
return -1;
/* Then the data itself.. */
stream.next_in = (void *)buf;
stream.avail_in = len;
do {
unsigned char *in0 = stream.next_in;
ret = git_deflate(&stream, Z_FINISH);
the_hash_algo->update_fn(&c, in0, stream.next_in - in0);
if (write_buffer(fd, compressed, stream.next_out - compressed) < 0)
die(_("unable to write loose object file"));
stream.next_out = compressed;
stream.avail_out = sizeof(compressed);
ret = write_loose_object_common(&c, &stream, 1, in0, fd,
compressed, sizeof(compressed));
} while (ret == Z_OK);
if (ret != Z_STREAM_END)
die(_("unable to deflate new object %s (%d)"), oid_to_hex(oid),
ret);
ret = git_deflate_end_gently(&stream);
ret = end_loose_object_common(&c, &stream, &parano_oid);
if (ret != Z_OK)
die(_("deflateEnd on object %s failed (%d)"), oid_to_hex(oid),
ret);
the_hash_algo->final_oid_fn(&parano_oid, &c);
if (!oideq(oid, &parano_oid))
die(_("confused by unstable object source data for %s"),
oid_to_hex(oid));
@ -2050,6 +2119,110 @@ static int freshen_packed_object(const struct object_id *oid)
return 1;
}
int stream_loose_object(struct input_stream *in_stream, size_t len,
struct object_id *oid)
{
int fd, ret, err = 0, flush = 0;
unsigned char compressed[4096];
git_zstream stream;
git_hash_ctx c;
struct strbuf tmp_file = STRBUF_INIT;
struct strbuf filename = STRBUF_INIT;
int dirlen;
char hdr[MAX_HEADER_LEN];
int hdrlen;
if (batch_fsync_enabled(FSYNC_COMPONENT_LOOSE_OBJECT))
prepare_loose_object_bulk_checkin();
/* Since oid is not determined, save tmp file to odb path. */
strbuf_addf(&filename, "%s/", get_object_directory());
hdrlen = format_object_header(hdr, sizeof(hdr), OBJ_BLOB, len);
/*
* Common steps for write_loose_object and stream_loose_object to
* start writing loose objects:
*
* - Create tmpfile for the loose object.
* - Setup zlib stream for compression.
* - Start to feed header to zlib stream.
*/
fd = start_loose_object_common(&tmp_file, filename.buf, 0,
&stream, compressed, sizeof(compressed),
&c, hdr, hdrlen);
if (fd < 0) {
err = -1;
goto cleanup;
}
/* Then the data itself.. */
do {
unsigned char *in0 = stream.next_in;
if (!stream.avail_in && !in_stream->is_finished) {
const void *in = in_stream->read(in_stream, &stream.avail_in);
stream.next_in = (void *)in;
in0 = (unsigned char *)in;
/* All data has been read. */
if (in_stream->is_finished)
flush = 1;
}
ret = write_loose_object_common(&c, &stream, flush, in0, fd,
compressed, sizeof(compressed));
/*
* Unlike write_loose_object(), we do not have the entire
* buffer. If we get Z_BUF_ERROR due to too few input bytes,
* then we'll replenish them in the next input_stream->read()
* call when we loop.
*/
} while (ret == Z_OK || ret == Z_BUF_ERROR);
if (stream.total_in != len + hdrlen)
die(_("write stream object %ld != %"PRIuMAX), stream.total_in,
(uintmax_t)len + hdrlen);
/*
* Common steps for write_loose_object and stream_loose_object to
* end writing loose oject:
*
* - End the compression of zlib stream.
* - Get the calculated oid.
*/
if (ret != Z_STREAM_END)
die(_("unable to stream deflate new object (%d)"), ret);
ret = end_loose_object_common(&c, &stream, oid);
if (ret != Z_OK)
die(_("deflateEnd on stream object failed (%d)"), ret);
close_loose_object(fd, tmp_file.buf);
if (freshen_packed_object(oid) || freshen_loose_object(oid)) {
unlink_or_warn(tmp_file.buf);
goto cleanup;
}
loose_object_path(the_repository, &filename, oid);
/* We finally know the object path, and create the missing dir. */
dirlen = directory_size(filename.buf);
if (dirlen) {
struct strbuf dir = STRBUF_INIT;
strbuf_add(&dir, filename.buf, dirlen);
if (mkdir_in_gitdir(dir.buf) && errno != EEXIST) {
err = error_errno(_("unable to create directory %s"), dir.buf);
strbuf_release(&dir);
goto cleanup;
}
strbuf_release(&dir);
}
err = finalize_object_file(tmp_file.buf, filename.buf);
cleanup:
strbuf_release(&tmp_file);
strbuf_release(&filename);
return err;
}
int write_object_file_flags(const void *buf, unsigned long len,
enum object_type type, struct object_id *oid,
unsigned flags)