Merge branch 'jt/fetch-pack-negotiator'

Code restructuring and a small fix to transport protocol v2 during
fetching.

* jt/fetch-pack-negotiator:
  fetch-pack: introduce negotiator API
  fetch-pack: move common check and marking together
  fetch-pack: make negotiation-related vars local
  fetch-pack: use ref adv. to prune "have" sent
  fetch-pack: directly end negotiation if ACK ready
  fetch-pack: clear marks before re-marking
  fetch-pack: split up everything_local()
This commit is contained in:
Junio C Hamano
2018-08-02 15:30:41 -07:00
8 changed files with 373 additions and 169 deletions

View File

@ -15,12 +15,12 @@
#include "connect.h"
#include "transport.h"
#include "version.h"
#include "prio-queue.h"
#include "sha1-array.h"
#include "oidset.h"
#include "packfile.h"
#include "object-store.h"
#include "connected.h"
#include "fetch-negotiator.h"
static int transfer_unpack_limit = -1;
static int fetch_unpack_limit = -1;
@ -38,13 +38,7 @@ static const char *alternate_shallow_file;
/* Remember to update object flag allocation in object.h */
#define COMPLETE (1U << 0)
#define COMMON (1U << 1)
#define COMMON_REF (1U << 2)
#define SEEN (1U << 3)
#define POPPED (1U << 4)
#define ALTERNATE (1U << 5)
static int marked;
#define ALTERNATE (1U << 1)
/*
* After sending this many "have"s if we do not get any new ACK , we
@ -52,8 +46,7 @@ static int marked;
*/
#define MAX_IN_VAIN 256
static struct prio_queue rev_list = { compare_commits_by_commit_date };
static int non_common_revs, multi_ack, use_sideband;
static int multi_ack, use_sideband;
/* Allow specifying sha1 if it is a ref tip. */
#define ALLOW_TIP_SHA1 01
/* Allow request of a sha1 if it is reachable from a ref (possibly hidden ref). */
@ -95,7 +88,9 @@ static void cache_one_alternate(const char *refname,
cache->items[cache->nr++] = obj;
}
static void for_each_cached_alternate(void (*cb)(struct object *))
static void for_each_cached_alternate(struct fetch_negotiator *negotiator,
void (*cb)(struct fetch_negotiator *,
struct object *))
{
static int initialized;
static struct alternate_object_cache cache;
@ -107,30 +102,17 @@ static void for_each_cached_alternate(void (*cb)(struct object *))
}
for (i = 0; i < cache.nr; i++)
cb(cache.items[i]);
cb(negotiator, cache.items[i]);
}
static void rev_list_push(struct commit *commit, int mark)
{
if (!(commit->object.flags & mark)) {
commit->object.flags |= mark;
if (parse_commit(commit))
return;
prio_queue_put(&rev_list, commit);
if (!(commit->object.flags & COMMON))
non_common_revs++;
}
}
static int rev_list_insert_ref(const char *refname, const struct object_id *oid)
static int rev_list_insert_ref(struct fetch_negotiator *negotiator,
const char *refname,
const struct object_id *oid)
{
struct object *o = deref_tag(parse_object(oid), refname, 0);
if (o && o->type == OBJ_COMMIT)
rev_list_push((struct commit *)o, SEEN);
negotiator->add_tip(negotiator, (struct commit *)o);
return 0;
}
@ -138,98 +120,7 @@ static int rev_list_insert_ref(const char *refname, const struct object_id *oid)
static int rev_list_insert_ref_oid(const char *refname, const struct object_id *oid,
int flag, void *cb_data)
{
return rev_list_insert_ref(refname, oid);
}
static int clear_marks(const char *refname, const struct object_id *oid,
int flag, void *cb_data)
{
struct object *o = deref_tag(parse_object(oid), refname, 0);
if (o && o->type == OBJ_COMMIT)
clear_commit_marks((struct commit *)o,
COMMON | COMMON_REF | SEEN | POPPED);
return 0;
}
/*
This function marks a rev and its ancestors as common.
In some cases, it is desirable to mark only the ancestors (for example
when only the server does not yet know that they are common).
*/
static void mark_common(struct commit *commit,
int ancestors_only, int dont_parse)
{
if (commit != NULL && !(commit->object.flags & COMMON)) {
struct object *o = (struct object *)commit;
if (!ancestors_only)
o->flags |= COMMON;
if (!(o->flags & SEEN))
rev_list_push(commit, SEEN);
else {
struct commit_list *parents;
if (!ancestors_only && !(o->flags & POPPED))
non_common_revs--;
if (!o->parsed && !dont_parse)
if (parse_commit(commit))
return;
for (parents = commit->parents;
parents;
parents = parents->next)
mark_common(parents->item, 0, dont_parse);
}
}
}
/*
Get the next rev to send, ignoring the common.
*/
static const struct object_id *get_rev(void)
{
struct commit *commit = NULL;
while (commit == NULL) {
unsigned int mark;
struct commit_list *parents;
if (rev_list.nr == 0 || non_common_revs == 0)
return NULL;
commit = prio_queue_get(&rev_list);
parse_commit(commit);
parents = commit->parents;
commit->object.flags |= POPPED;
if (!(commit->object.flags & COMMON))
non_common_revs--;
if (commit->object.flags & COMMON) {
/* do not send "have", and ignore ancestors */
commit = NULL;
mark = COMMON | SEEN;
} else if (commit->object.flags & COMMON_REF)
/* send "have", and ignore ancestors */
mark = COMMON | SEEN;
else
/* send "have", also for its ancestors */
mark = SEEN;
while (parents) {
if (!(parents->item->object.flags & SEEN))
rev_list_push(parents->item, mark);
if (mark & COMMON)
mark_common(parents->item, 1, 0);
parents = parents->next;
}
}
return &commit->object.oid;
return rev_list_insert_ref(cb_data, refname, oid);
}
enum ack_type {
@ -298,9 +189,10 @@ static void send_request(struct fetch_pack_args *args,
write_or_die(fd, buf->buf, buf->len);
}
static void insert_one_alternate_object(struct object *obj)
static void insert_one_alternate_object(struct fetch_negotiator *negotiator,
struct object *obj)
{
rev_list_insert_ref(NULL, &obj->oid);
rev_list_insert_ref(negotiator, NULL, &obj->oid);
}
#define INITIAL_FLUSH 16
@ -323,7 +215,8 @@ static int next_flush(int stateless_rpc, int count)
return count;
}
static int find_common(struct fetch_pack_args *args,
static int find_common(struct fetch_negotiator *negotiator,
struct fetch_pack_args *args,
int fd[2], struct object_id *result_oid,
struct ref *refs)
{
@ -338,12 +231,9 @@ static int find_common(struct fetch_pack_args *args,
if (args->stateless_rpc && multi_ack == 1)
die(_("--stateless-rpc requires multi_ack_detailed"));
if (marked)
for_each_ref(clear_marks, NULL);
marked = 1;
for_each_ref(rev_list_insert_ref_oid, NULL);
for_each_cached_alternate(insert_one_alternate_object);
for_each_ref(rev_list_insert_ref_oid, negotiator);
for_each_cached_alternate(negotiator, insert_one_alternate_object);
fetching = 0;
for ( ; refs ; refs = refs->next) {
@ -461,7 +351,7 @@ static int find_common(struct fetch_pack_args *args,
retval = -1;
if (args->no_dependents)
goto done;
while ((oid = get_rev())) {
while ((oid = negotiator->next(negotiator))) {
packet_buf_write(&req_buf, "have %s\n", oid_to_hex(oid));
print_verbose(args, "have %s", oid_to_hex(oid));
in_vain++;
@ -498,11 +388,13 @@ static int find_common(struct fetch_pack_args *args,
case ACK_continue: {
struct commit *commit =
lookup_commit(result_oid);
int was_common;
if (!commit)
die(_("invalid commit %s"), oid_to_hex(result_oid));
was_common = negotiator->ack(negotiator, commit);
if (args->stateless_rpc
&& ack == ACK_common
&& !(commit->object.flags & COMMON)) {
&& !was_common) {
/* We need to replay the have for this object
* on the next RPC request so the peer knows
* it is in common with us.
@ -519,13 +411,10 @@ static int find_common(struct fetch_pack_args *args,
} else if (!args->stateless_rpc
|| ack != ACK_common)
in_vain = 0;
mark_common(commit, 0, 1);
retval = 0;
got_continue = 1;
if (ack == ACK_ready) {
clear_prio_queue(&rev_list);
if (ack == ACK_ready)
got_ready = 1;
}
break;
}
}
@ -535,6 +424,8 @@ static int find_common(struct fetch_pack_args *args,
print_verbose(args, _("giving up"));
break; /* give up */
}
if (got_ready)
break;
}
}
done:
@ -709,7 +600,8 @@ static void filter_refs(struct fetch_pack_args *args,
*refs = newlist;
}
static void mark_alternate_complete(struct object *obj)
static void mark_alternate_complete(struct fetch_negotiator *unused,
struct object *obj)
{
mark_complete(&obj->oid);
}
@ -736,12 +628,21 @@ static int add_loose_objects_to_set(const struct object_id *oid,
return 0;
}
static int everything_local(struct fetch_pack_args *args,
struct ref **refs,
struct ref **sought, int nr_sought)
/*
* Mark recent commits available locally and reachable from a local ref as
* COMPLETE. If args->no_dependents is false, also mark COMPLETE remote refs as
* COMMON_REF (otherwise, we are not planning to participate in negotiation, and
* thus do not need COMMON_REF marks).
*
* The cutoff time for recency is determined by this heuristic: it is the
* earliest commit time of the objects in refs that are commits and that we know
* the commit time of.
*/
static void mark_complete_and_common_ref(struct fetch_negotiator *negotiator,
struct fetch_pack_args *args,
struct ref **refs)
{
struct ref *ref;
int retval;
int old_save_commit_buffer = save_commit_buffer;
timestamp_t cutoff = 0;
struct oidset loose_oid_set = OIDSET_INIT;
@ -789,7 +690,7 @@ static int everything_local(struct fetch_pack_args *args,
if (!args->no_dependents) {
if (!args->deepen) {
for_each_ref(mark_complete_oid, NULL);
for_each_cached_alternate(mark_alternate_complete);
for_each_cached_alternate(NULL, mark_alternate_complete);
commit_list_sort_by_date(&complete);
if (cutoff)
mark_recent_complete_commits(args, cutoff);
@ -806,15 +707,23 @@ static int everything_local(struct fetch_pack_args *args,
if (!o || o->type != OBJ_COMMIT || !(o->flags & COMPLETE))
continue;
if (!(o->flags & SEEN)) {
rev_list_push((struct commit *)o, COMMON_REF | SEEN);
mark_common((struct commit *)o, 1, 1);
}
negotiator->known_common(negotiator,
(struct commit *)o);
}
}
filter_refs(args, refs, sought, nr_sought);
save_commit_buffer = old_save_commit_buffer;
}
/*
* Returns 1 if every object pointed to by the given remote refs is available
* locally and reachable from a local ref, and 0 otherwise.
*/
static int everything_local(struct fetch_pack_args *args,
struct ref **refs)
{
struct ref *ref;
int retval;
for (retval = 1, ref = *refs; ref ; ref = ref->next) {
const struct object_id *remote = &ref->old_oid;
@ -831,8 +740,6 @@ static int everything_local(struct fetch_pack_args *args,
ref->name);
}
save_commit_buffer = old_save_commit_buffer;
return retval;
}
@ -983,6 +890,8 @@ static struct ref *do_fetch_pack(struct fetch_pack_args *args,
struct object_id oid;
const char *agent_feature;
int agent_len;
struct fetch_negotiator negotiator;
fetch_negotiator_init(&negotiator);
sort_ref_list(&ref, ref_compare_name);
QSORT(sought, nr_sought, cmp_ref_by_name);
@ -1055,11 +964,13 @@ static struct ref *do_fetch_pack(struct fetch_pack_args *args,
if (!server_supports("deepen-relative") && args->deepen_relative)
die(_("Server does not support --deepen"));
if (everything_local(args, &ref, sought, nr_sought)) {
mark_complete_and_common_ref(&negotiator, args, &ref);
filter_refs(args, &ref, sought, nr_sought);
if (everything_local(args, &ref)) {
packet_flush(fd[1]);
goto all_done;
}
if (find_common(args, fd, &oid, ref) < 0)
if (find_common(&negotiator, args, fd, &oid, ref) < 0)
if (!args->keep_pack)
/* When cloning, it is not unusual to have
* no common commit.
@ -1079,6 +990,7 @@ static struct ref *do_fetch_pack(struct fetch_pack_args *args,
die(_("git fetch-pack: fetch failed."));
all_done:
negotiator.release(&negotiator);
return ref;
}
@ -1143,13 +1055,15 @@ static void add_common(struct strbuf *req_buf, struct oidset *common)
}
}
static int add_haves(struct strbuf *req_buf, int *haves_to_send, int *in_vain)
static int add_haves(struct fetch_negotiator *negotiator,
struct strbuf *req_buf,
int *haves_to_send, int *in_vain)
{
int ret = 0;
int haves_added = 0;
const struct object_id *oid;
while ((oid = get_rev())) {
while ((oid = negotiator->next(negotiator))) {
packet_buf_write(req_buf, "have %s\n", oid_to_hex(oid));
if (++haves_added >= *haves_to_send)
break;
@ -1168,7 +1082,8 @@ static int add_haves(struct strbuf *req_buf, int *haves_to_send, int *in_vain)
return ret;
}
static int send_fetch_request(int fd_out, const struct fetch_pack_args *args,
static int send_fetch_request(struct fetch_negotiator *negotiator, int fd_out,
const struct fetch_pack_args *args,
const struct ref *wants, struct oidset *common,
int *haves_to_send, int *in_vain)
{
@ -1224,7 +1139,7 @@ static int send_fetch_request(int fd_out, const struct fetch_pack_args *args,
add_common(&req_buf, common);
/* Add initial haves */
ret = add_haves(&req_buf, haves_to_send, in_vain);
ret = add_haves(negotiator, &req_buf, haves_to_send, in_vain);
}
/* Send request */
@ -1261,7 +1176,9 @@ static int process_section_header(struct packet_reader *reader,
return ret;
}
static int process_acks(struct packet_reader *reader, struct oidset *common)
static int process_acks(struct fetch_negotiator *negotiator,
struct packet_reader *reader,
struct oidset *common)
{
/* received */
int received_ready = 0;
@ -1280,13 +1197,12 @@ static int process_acks(struct packet_reader *reader, struct oidset *common)
struct commit *commit;
oidset_insert(common, &oid);
commit = lookup_commit(&oid);
mark_common(commit, 0, 1);
negotiator->ack(negotiator, commit);
}
continue;
}
if (!strcmp(reader->line, "ready")) {
clear_prio_queue(&rev_list);
received_ready = 1;
continue;
}
@ -1385,6 +1301,8 @@ static struct ref *do_fetch_pack_v2(struct fetch_pack_args *args,
struct packet_reader reader;
int in_vain = 0;
int haves_to_send = INITIAL_FLUSH;
struct fetch_negotiator negotiator;
fetch_negotiator_init(&negotiator);
packet_reader_init(&reader, fd[0], NULL, 0,
PACKET_READ_CHOMP_NEWLINE);
@ -1400,21 +1318,21 @@ static struct ref *do_fetch_pack_v2(struct fetch_pack_args *args,
if (args->depth > 0 || args->deepen_since || args->deepen_not)
args->deepen = 1;
if (marked)
for_each_ref(clear_marks, NULL);
marked = 1;
for_each_ref(rev_list_insert_ref_oid, NULL);
for_each_cached_alternate(insert_one_alternate_object);
/* Filter 'ref' by 'sought' and those that aren't local */
if (everything_local(args, &ref, sought, nr_sought))
mark_complete_and_common_ref(&negotiator, args, &ref);
filter_refs(args, &ref, sought, nr_sought);
if (everything_local(args, &ref))
state = FETCH_DONE;
else
state = FETCH_SEND_REQUEST;
for_each_ref(rev_list_insert_ref_oid, &negotiator);
for_each_cached_alternate(&negotiator,
insert_one_alternate_object);
break;
case FETCH_SEND_REQUEST:
if (send_fetch_request(fd[1], args, ref, &common,
if (send_fetch_request(&negotiator, fd[1], args, ref,
&common,
&haves_to_send, &in_vain))
state = FETCH_GET_PACK;
else
@ -1422,7 +1340,7 @@ static struct ref *do_fetch_pack_v2(struct fetch_pack_args *args,
break;
case FETCH_PROCESS_ACKS:
/* Process ACKs/NAKs */
switch (process_acks(&reader, &common)) {
switch (process_acks(&negotiator, &reader, &common)) {
case 2:
state = FETCH_GET_PACK;
break;
@ -1454,6 +1372,7 @@ static struct ref *do_fetch_pack_v2(struct fetch_pack_args *args,
}
}
negotiator.release(&negotiator);
oidset_clear(&common);
return ref;
}