Merge branch 'jt/threaded-index-pack'

"git index-pack" learned to resolve deltified objects with greater
parallelism.

* jt/threaded-index-pack:
  index-pack: make quantum of work smaller
  index-pack: make resolve_delta() assume base data
  index-pack: calculate {ref,ofs}_{first,last} early
  index-pack: remove redundant child field
  index-pack: unify threaded and unthreaded code
  index-pack: remove redundant parameter
  Documentation: deltaBaseCacheLimit is per-thread
This commit is contained in:
Junio C Hamano
2020-09-22 12:36:28 -07:00
2 changed files with 251 additions and 207 deletions

View File

@ -33,19 +33,61 @@ struct object_stat {
};
struct base_data {
/* Initialized by make_base(). */
struct base_data *base;
struct base_data *child;
struct object_entry *obj;
void *data;
unsigned long size;
int ref_first, ref_last;
int ofs_first, ofs_last;
/*
* Threads should increment retain_data if they are about to call
* patch_delta() using this struct's data as a base, and decrement this
* when they are done. While retain_data is nonzero, this struct's data
* will not be freed even if the delta base cache limit is exceeded.
*/
int retain_data;
/*
* The number of direct children that have not been fully processed
* (entered work_head, entered done_head, left done_head). When this
* number reaches zero, this struct base_data can be freed.
*/
int children_remaining;
/* Not initialized by make_base(). */
struct list_head list;
void *data;
unsigned long size;
};
/*
* Stack of struct base_data that have unprocessed children.
* threaded_second_pass() uses this as a source of work (the other being the
* objects array).
*
* Guarded by work_mutex.
*/
static LIST_HEAD(work_head);
/*
* Stack of struct base_data that have children, all of whom have been
* processed or are being processed, and at least one child is being processed.
* These struct base_data must be kept around until the last child is
* processed.
*
* Guarded by work_mutex.
*/
static LIST_HEAD(done_head);
/*
* All threads share one delta base cache.
*
* base_cache_used is guarded by work_mutex, and base_cache_limit is read-only
* in a thread.
*/
static size_t base_cache_used;
static size_t base_cache_limit;
struct thread_local {
pthread_t thread;
struct base_data *base_cache;
size_t base_cache_used;
int pack_fd;
};
@ -364,56 +406,42 @@ static void set_thread_data(struct thread_local *data)
pthread_setspecific(key, data);
}
static struct base_data *alloc_base_data(void)
{
struct base_data *base = xcalloc(1, sizeof(struct base_data));
base->ref_last = -1;
base->ofs_last = -1;
return base;
}
static void free_base_data(struct base_data *c)
{
if (c->data) {
FREE_AND_NULL(c->data);
get_thread_data()->base_cache_used -= c->size;
base_cache_used -= c->size;
}
}
static void prune_base_data(struct base_data *retain)
{
struct base_data *b;
struct thread_local *data = get_thread_data();
for (b = data->base_cache;
data->base_cache_used > delta_base_cache_limit && b;
b = b->child) {
if (b->data && b != retain)
struct list_head *pos;
if (base_cache_used <= base_cache_limit)
return;
list_for_each_prev(pos, &done_head) {
struct base_data *b = list_entry(pos, struct base_data, list);
if (b->retain_data || b == retain)
continue;
if (b->data) {
free_base_data(b);
if (base_cache_used <= base_cache_limit)
return;
}
}
}
static void link_base_data(struct base_data *base, struct base_data *c)
{
if (base)
base->child = c;
else
get_thread_data()->base_cache = c;
c->base = base;
c->child = NULL;
if (c->data)
get_thread_data()->base_cache_used += c->size;
prune_base_data(c);
}
static void unlink_base_data(struct base_data *c)
{
struct base_data *base = c->base;
if (base)
base->child = NULL;
else
get_thread_data()->base_cache = NULL;
free_base_data(c);
list_for_each_prev(pos, &work_head) {
struct base_data *b = list_entry(pos, struct base_data, list);
if (b->retain_data || b == retain)
continue;
if (b->data) {
free_base_data(b);
if (base_cache_used <= base_cache_limit)
return;
}
}
}
static int is_delta_type(enum object_type type)
@ -614,7 +642,7 @@ static int compare_ofs_delta_bases(off_t offset1, off_t offset2,
0;
}
static int find_ofs_delta(const off_t offset, enum object_type type)
static int find_ofs_delta(const off_t offset)
{
int first = 0, last = nr_ofs_deltas;
@ -624,7 +652,8 @@ static int find_ofs_delta(const off_t offset, enum object_type type)
int cmp;
cmp = compare_ofs_delta_bases(offset, delta->offset,
type, objects[delta->obj_no].type);
OBJ_OFS_DELTA,
objects[delta->obj_no].type);
if (!cmp)
return next;
if (cmp < 0) {
@ -637,10 +666,9 @@ static int find_ofs_delta(const off_t offset, enum object_type type)
}
static void find_ofs_delta_children(off_t offset,
int *first_index, int *last_index,
enum object_type type)
int *first_index, int *last_index)
{
int first = find_ofs_delta(offset, type);
int first = find_ofs_delta(offset);
int last = first;
int end = nr_ofs_deltas - 1;
@ -668,7 +696,7 @@ static int compare_ref_delta_bases(const struct object_id *oid1,
return oidcmp(oid1, oid2);
}
static int find_ref_delta(const struct object_id *oid, enum object_type type)
static int find_ref_delta(const struct object_id *oid)
{
int first = 0, last = nr_ref_deltas;
@ -678,7 +706,8 @@ static int find_ref_delta(const struct object_id *oid, enum object_type type)
int cmp;
cmp = compare_ref_delta_bases(oid, &delta->oid,
type, objects[delta->obj_no].type);
OBJ_REF_DELTA,
objects[delta->obj_no].type);
if (!cmp)
return next;
if (cmp < 0) {
@ -691,10 +720,9 @@ static int find_ref_delta(const struct object_id *oid, enum object_type type)
}
static void find_ref_delta_children(const struct object_id *oid,
int *first_index, int *last_index,
enum object_type type)
int *first_index, int *last_index)
{
int first = find_ref_delta(oid, type);
int first = find_ref_delta(oid);
int last = first;
int end = nr_ref_deltas - 1;
@ -866,15 +894,7 @@ static void sha1_object(const void *data, struct object_entry *obj_entry,
}
/*
* This function is part of find_unresolved_deltas(). There are two
* walkers going in the opposite ways.
*
* The first one in find_unresolved_deltas() traverses down from
* parent node to children, deflating nodes along the way. However,
* memory for deflated nodes is limited by delta_base_cache_limit, so
* at some point parent node's deflated content may be freed.
*
* The second walker is this function, which goes from current node up
* Walk from current node up
* to top parent if necessary to deflate the node. In normal
* situation, its parent node would be already deflated, so it just
* needs to apply delta.
@ -902,7 +922,7 @@ static void *get_base_data(struct base_data *c)
if (!delta_nr) {
c->data = get_data_from_pack(obj);
c->size = obj->size;
get_thread_data()->base_cache_used += c->size;
base_cache_used += c->size;
prune_base_data(c);
}
for (; delta_nr > 0; delta_nr--) {
@ -918,7 +938,7 @@ static void *get_base_data(struct base_data *c)
free(raw);
if (!c->data)
bad_object(obj->idx.offset, _("failed to apply delta"));
get_thread_data()->base_cache_used += c->size;
base_cache_used += c->size;
prune_base_data(c);
}
free(delta);
@ -926,10 +946,27 @@ static void *get_base_data(struct base_data *c)
return c->data;
}
static void resolve_delta(struct object_entry *delta_obj,
struct base_data *base, struct base_data *result)
static struct base_data *make_base(struct object_entry *obj,
struct base_data *parent)
{
void *base_data, *delta_data;
struct base_data *base = xcalloc(1, sizeof(struct base_data));
base->base = parent;
base->obj = obj;
find_ref_delta_children(&obj->idx.oid,
&base->ref_first, &base->ref_last);
find_ofs_delta_children(obj->idx.offset,
&base->ofs_first, &base->ofs_last);
base->children_remaining = base->ref_last - base->ref_first +
base->ofs_last - base->ofs_first + 2;
return base;
}
static struct base_data *resolve_delta(struct object_entry *delta_obj,
struct base_data *base)
{
void *delta_data, *result_data;
struct base_data *result;
unsigned long result_size;
if (show_stat) {
int i = delta_obj - objects;
@ -942,115 +979,26 @@ static void resolve_delta(struct object_entry *delta_obj,
obj_stat[i].base_object_no = j;
}
delta_data = get_data_from_pack(delta_obj);
base_data = get_base_data(base);
result->obj = delta_obj;
result->data = patch_delta(base_data, base->size,
delta_data, delta_obj->size, &result->size);
assert(base->data);
result_data = patch_delta(base->data, base->size,
delta_data, delta_obj->size, &result_size);
free(delta_data);
if (!result->data)
if (!result_data)
bad_object(delta_obj->idx.offset, _("failed to apply delta"));
hash_object_file(the_hash_algo, result->data, result->size,
hash_object_file(the_hash_algo, result_data, result_size,
type_name(delta_obj->real_type), &delta_obj->idx.oid);
sha1_object(result->data, NULL, result->size, delta_obj->real_type,
sha1_object(result_data, NULL, result_size, delta_obj->real_type,
&delta_obj->idx.oid);
result = make_base(delta_obj, base);
result->data = result_data;
result->size = result_size;
counter_lock();
nr_resolved_deltas++;
counter_unlock();
}
/*
* Standard boolean compare-and-swap: atomically check whether "*type" is
* "want"; if so, swap in "set" and return true. Otherwise, leave it untouched
* and return false.
*/
static int compare_and_swap_type(signed char *type,
enum object_type want,
enum object_type set)
{
enum object_type old;
type_cas_lock();
old = *type;
if (old == want)
*type = set;
type_cas_unlock();
return old == want;
}
static struct base_data *find_unresolved_deltas_1(struct base_data *base,
struct base_data *prev_base)
{
if (base->ref_last == -1 && base->ofs_last == -1) {
find_ref_delta_children(&base->obj->idx.oid,
&base->ref_first, &base->ref_last,
OBJ_REF_DELTA);
find_ofs_delta_children(base->obj->idx.offset,
&base->ofs_first, &base->ofs_last,
OBJ_OFS_DELTA);
if (base->ref_last == -1 && base->ofs_last == -1) {
free(base->data);
return NULL;
}
link_base_data(prev_base, base);
}
if (base->ref_first <= base->ref_last) {
struct object_entry *child = objects + ref_deltas[base->ref_first].obj_no;
struct base_data *result = alloc_base_data();
if (!compare_and_swap_type(&child->real_type, OBJ_REF_DELTA,
base->obj->real_type))
die("REF_DELTA at offset %"PRIuMAX" already resolved (duplicate base %s?)",
(uintmax_t)child->idx.offset,
oid_to_hex(&base->obj->idx.oid));
resolve_delta(child, base, result);
if (base->ref_first == base->ref_last && base->ofs_last == -1)
free_base_data(base);
base->ref_first++;
return result;
}
if (base->ofs_first <= base->ofs_last) {
struct object_entry *child = objects + ofs_deltas[base->ofs_first].obj_no;
struct base_data *result = alloc_base_data();
assert(child->real_type == OBJ_OFS_DELTA);
child->real_type = base->obj->real_type;
resolve_delta(child, base, result);
if (base->ofs_first == base->ofs_last)
free_base_data(base);
base->ofs_first++;
return result;
}
unlink_base_data(base);
return NULL;
}
static void find_unresolved_deltas(struct base_data *base)
{
struct base_data *new_base, *prev_base = NULL;
for (;;) {
new_base = find_unresolved_deltas_1(base, prev_base);
if (new_base) {
prev_base = base;
base = new_base;
} else {
free(base);
base = prev_base;
if (!base)
return;
prev_base = base->base;
}
}
return result;
}
static int compare_ofs_delta_entry(const void *a, const void *b)
@ -1071,34 +1019,131 @@ static int compare_ref_delta_entry(const void *a, const void *b)
return oidcmp(&delta_a->oid, &delta_b->oid);
}
static void resolve_base(struct object_entry *obj)
{
struct base_data *base_obj = alloc_base_data();
base_obj->obj = obj;
base_obj->data = NULL;
find_unresolved_deltas(base_obj);
}
static void *threaded_second_pass(void *data)
{
set_thread_data(data);
if (data)
set_thread_data(data);
for (;;) {
int i;
counter_lock();
display_progress(progress, nr_resolved_deltas);
counter_unlock();
struct base_data *parent = NULL;
struct object_entry *child_obj;
struct base_data *child;
work_lock();
while (nr_dispatched < nr_objects &&
is_delta_type(objects[nr_dispatched].type))
nr_dispatched++;
if (nr_dispatched >= nr_objects) {
work_unlock();
break;
if (list_empty(&work_head)) {
/*
* Take an object from the object array.
*/
while (nr_dispatched < nr_objects &&
is_delta_type(objects[nr_dispatched].type))
nr_dispatched++;
if (nr_dispatched >= nr_objects) {
work_unlock();
break;
}
child_obj = &objects[nr_dispatched++];
} else {
/*
* Peek at the top of the stack, and take a child from
* it.
*/
parent = list_first_entry(&work_head, struct base_data,
list);
if (parent->ref_first <= parent->ref_last) {
int offset = ref_deltas[parent->ref_first++].obj_no;
child_obj = objects + offset;
if (child_obj->real_type != OBJ_REF_DELTA)
die("REF_DELTA at offset %"PRIuMAX" already resolved (duplicate base %s?)",
(uintmax_t) child_obj->idx.offset,
oid_to_hex(&parent->obj->idx.oid));
child_obj->real_type = parent->obj->real_type;
} else {
child_obj = objects +
ofs_deltas[parent->ofs_first++].obj_no;
assert(child_obj->real_type == OBJ_OFS_DELTA);
child_obj->real_type = parent->obj->real_type;
}
if (parent->ref_first > parent->ref_last &&
parent->ofs_first > parent->ofs_last) {
/*
* This parent has run out of children, so move
* it to done_head.
*/
list_del(&parent->list);
list_add(&parent->list, &done_head);
}
/*
* Ensure that the parent has data, since we will need
* it later.
*
* NEEDSWORK: If parent data needs to be reloaded, this
* prolongs the time that the current thread spends in
* the mutex. A mitigating factor is that parent data
* needs to be reloaded only if the delta base cache
* limit is exceeded, so in the typical case, this does
* not happen.
*/
get_base_data(parent);
parent->retain_data++;
}
i = nr_dispatched++;
work_unlock();
resolve_base(&objects[i]);
if (parent) {
child = resolve_delta(child_obj, parent);
if (!child->children_remaining)
FREE_AND_NULL(child->data);
} else {
child = make_base(child_obj, NULL);
if (child->children_remaining) {
/*
* Since this child has its own delta children,
* we will need this data in the future.
* Inflate now so that future iterations will
* have access to this object's data while
* outside the work mutex.
*/
child->data = get_data_from_pack(child_obj);
child->size = child_obj->size;
}
}
work_lock();
if (parent)
parent->retain_data--;
if (child->data) {
/*
* This child has its own children, so add it to
* work_head.
*/
list_add(&child->list, &work_head);
base_cache_used += child->size;
prune_base_data(NULL);
} else {
/*
* This child does not have its own children. It may be
* the last descendant of its ancestors; free those
* that we can.
*/
struct base_data *p = parent;
while (p) {
struct base_data *next_p;
p->children_remaining--;
if (p->children_remaining)
break;
next_p = p->base;
free_base_data(p);
list_del(&p->list);
free(p);
p = next_p;
}
}
work_unlock();
}
return NULL;
}
@ -1199,6 +1244,7 @@ static void resolve_deltas(void)
nr_ref_deltas + nr_ofs_deltas);
nr_dispatched = 0;
base_cache_limit = delta_base_cache_limit * nr_threads;
if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
init_thread();
for (i = 0; i < nr_threads; i++) {
@ -1213,15 +1259,7 @@ static void resolve_deltas(void)
cleanup_thread();
return;
}
for (i = 0; i < nr_objects; i++) {
struct object_entry *obj = &objects[i];
if (is_delta_type(obj->type))
continue;
resolve_base(obj);
display_progress(progress, nr_resolved_deltas);
}
threaded_second_pass(&nothread_data);
}
/*
@ -1376,22 +1414,28 @@ static void fix_unresolved_deltas(struct hashfile *f)
for (i = 0; i < nr_ref_deltas; i++) {
struct ref_delta_entry *d = sorted_by_pos[i];
enum object_type type;
struct base_data *base_obj = alloc_base_data();
void *data;
unsigned long size;
if (objects[d->obj_no].real_type != OBJ_REF_DELTA)
continue;
base_obj->data = read_object_file(&d->oid, &type,
&base_obj->size);
if (!base_obj->data)
data = read_object_file(&d->oid, &type, &size);
if (!data)
continue;
if (check_object_signature(the_repository, &d->oid,
base_obj->data, base_obj->size,
data, size,
type_name(type)))
die(_("local object %s is corrupt"), oid_to_hex(&d->oid));
base_obj->obj = append_obj_to_pack(f, d->oid.hash,
base_obj->data, base_obj->size, type);
find_unresolved_deltas(base_obj);
/*
* Add this as an object to the objects array and call
* threaded_second_pass() (which will pick up the added
* object).
*/
append_obj_to_pack(f, d->oid.hash, data, size, type);
threaded_second_pass(NULL);
display_progress(progress, nr_resolved_deltas);
}
free(sorted_by_pos);