librsync  2.3.2
delta.c
Go to the documentation of this file.
1 /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  *
3  * librsync -- library for network deltas
4  *
5  * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net>
6  * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au>
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published by
10  * the Free Software Foundation; either version 2.1 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22 
23  /*=
24  | Let's climb to the TOP of that
25  | MOUNTAIN and think about STRIP
26  | MINING!!
27  */
28 
29 /** \file delta.c
30  * Generate in streaming mode an rsync delta given a set of signatures, and a
31  * new file.
32  *
33  * The size of blocks for signature generation is determined by the block size
34  * in the incoming signature.
35  *
36  * To calculate a signature, we need to be able to see at least one block of
37  * the new file at a time. Once we have that, we calculate its weak signature,
38  * and see if there is any block in the signature hash table that has the same
39  * weak sum. If there is one, then we also compute the strong sum of the new
40  * block, and cross check that. If they're the same, then we can assume we have
41  * a match.
42  *
43  * The final block of the file has to be handled a little differently, because
44  * it may be a short match. Short blocks in the signature don't include their
45  * length -- we just allow for the final short block of the file to match any
46  * block in the signature, and if they have the same checksum we assume they
47  * must have the same length. Therefore, when we emit a COPY command, we have
48  * to send it with a length that is the same as the block matched, and not the
49  * block length from the signature.
50  *
51  * Profiling results as of v1.26, 2001-03-18:
52  *
53  * If everything matches, then we spend almost all our time in rs_mdfour64 and
54  * rs_weak_sum, which is unavoidable and therefore a good profile.
55  *
56  * If nothing matches, it is not so good.
57  *
58  * 2002-06-26: Donovan Baarda
59  *
60  * The following is based entirely on pysync. It is much cleaner than the
61  * previous incarnation of this code. It is slightly complicated because in
62  * this case the output can block, so the main delta loop needs to stop when
63  * this happens.
64  *
65  * In pysync a 'last' attribute is used to hold the last miss or match for
66  * extending if possible. In this code, basis_len and scoop_pos are used
67  * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0
68  * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie,
69  * nothing).
70  *
71  * Pysync is also slightly different in that a 'flush' method is available to
72  * force output of accumulated data. This 'flush' is use to finalise delta
73  * calculation. In librsync input is terminated with an eof flag on the input
74  * stream. I have structured this code similar to pysync with a seperate flush
75  * function that is used when eof is reached. This allows for a flush style API
76  * if one is ever needed. Note that flush in pysync can be used for more than
77  * just terminating delta calculation, so a flush based API can in some ways be
78  * more flexible...
79  *
80  * The input data is first scanned, then processed. Scanning identifies input
81  * data as misses or matches, and emits the instruction stream. Processing the
82  * data consumes it off the input scoop and outputs the processed miss data
83  * into the tube.
84  *
85  * The scoop contains all data yet to be processed. The scoop_pos is an index
86  * into the scoop that indicates the point scanned to. As data is scanned,
87  * scoop_pos is incremented. As data is processed, it is removed from the scoop
88  * and scoop_pos adjusted. Everything gets complicated because the tube can
89  * block. When the tube is blocked, no data can be processed. */
90 
91 #include "config.h"
92 #include <assert.h>
93 #include <stdlib.h>
94 #include "librsync.h"
95 #include "job.h"
96 #include "sumset.h"
97 #include "checksum.h"
98 #include "stream.h"
99 #include "emit.h"
100 #include "trace.h"
101 
102 static rs_result rs_delta_s_scan(rs_job_t *job);
103 static rs_result rs_delta_s_flush(rs_job_t *job);
104 static rs_result rs_delta_s_end(rs_job_t *job);
105 static inline void rs_getinput(rs_job_t *job);
106 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
107  size_t *match_len);
108 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
109  size_t match_len);
110 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
111 static inline rs_result rs_appendflush(rs_job_t *job);
112 static inline rs_result rs_processmatch(rs_job_t *job);
113 static inline rs_result rs_processmiss(rs_job_t *job);
114 
115 /** Get a block of data if possible, and see if it matches.
116  *
117  * On each call, we try to process all of the input data available on the scoop
118  * and input buffer. */
120 {
121  const size_t block_len = job->signature->block_len;
122  rs_long_t match_pos;
123  size_t match_len;
124  rs_result result;
125 
126  rs_job_check(job);
127  /* read the input into the scoop */
128  rs_getinput(job);
129  /* output any pending output from the tube */
130  result = rs_tube_catchup(job);
131  /* while output is not blocked and there is a block of data */
132  while ((result == RS_DONE)
133  && ((job->scoop_pos + block_len) < job->scoop_avail)) {
134  /* check if this block matches */
135  if (rs_findmatch(job, &match_pos, &match_len)) {
136  /* append the match and reset the weak_sum */
137  result = rs_appendmatch(job, match_pos, match_len);
138  weaksum_reset(&job->weak_sum);
139  } else {
140  /* rotate the weak_sum and append the miss byte */
141  weaksum_rotate(&job->weak_sum, job->scoop_next[job->scoop_pos],
142  job->scoop_next[job->scoop_pos + block_len]);
143  result = rs_appendmiss(job, 1);
144  }
145  }
146  /* if we completed OK */
147  if (result == RS_DONE) {
148  /* if we reached eof, we can flush the last fragment */
149  if (job->stream->eof_in) {
150  job->statefn = rs_delta_s_flush;
151  return RS_RUNNING;
152  } else {
153  /* we are blocked waiting for more data */
154  return RS_BLOCKED;
155  }
156  }
157  return result;
158 }
159 
160 static rs_result rs_delta_s_flush(rs_job_t *job)
161 {
162  rs_long_t match_pos;
163  size_t match_len;
164  rs_result result;
165 
166  rs_job_check(job);
167  /* read the input into the scoop */
168  rs_getinput(job);
169  /* output any pending output */
170  result = rs_tube_catchup(job);
171  /* while output is not blocked and there is any remaining data */
172  while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
173  /* check if this block matches */
174  if (rs_findmatch(job, &match_pos, &match_len)) {
175  /* append the match and reset the weak_sum */
176  result = rs_appendmatch(job, match_pos, match_len);
177  weaksum_reset(&job->weak_sum);
178  } else {
179  /* rollout from weak_sum and append the miss byte */
180  weaksum_rollout(&job->weak_sum, job->scoop_next[job->scoop_pos]);
181  rs_trace("block reduced to " FMT_SIZE "",
182  weaksum_count(&job->weak_sum));
183  result = rs_appendmiss(job, 1);
184  }
185  }
186  /* if we are not blocked, flush and set end statefn. */
187  if (result == RS_DONE) {
188  result = rs_appendflush(job);
189  job->statefn = rs_delta_s_end;
190  }
191  if (result == RS_DONE) {
192  return RS_RUNNING;
193  }
194  return result;
195 }
196 
197 static rs_result rs_delta_s_end(rs_job_t *job)
198 {
199  rs_emit_end_cmd(job);
200  return RS_DONE;
201 }
202 
203 static inline void rs_getinput(rs_job_t *job)
204 {
205  size_t len;
206 
207  len = rs_scoop_total_avail(job);
208  if (job->scoop_avail < len) {
209  rs_scoop_input(job, len);
210  }
211 }
212 
213 /** find a match at scoop_pos, returning the match_pos and match_len.
214  *
215  * Note that this will calculate weak_sum if required. It will also determine
216  * the match_len.
217  *
218  * This routine could be modified to do xdelta style matches that would extend
219  * matches past block boundaries by matching backwards and forwards beyond the
220  * block boundaries. Extending backwards would require decrementing scoop_pos
221  * as appropriate. */
222 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
223  size_t *match_len)
224 {
225  const size_t block_len = job->signature->block_len;
226 
227  /* calculate the weak_sum if we don't have one */
228  if (weaksum_count(&job->weak_sum) == 0) {
229  /* set match_len to min(block_len, scan_avail) */
230  *match_len = job->scoop_avail - job->scoop_pos;
231  if (*match_len > block_len) {
232  *match_len = block_len;
233  }
234  /* Update the weak_sum */
235  weaksum_update(&job->weak_sum, job->scoop_next + job->scoop_pos,
236  *match_len);
237  rs_trace("calculate weak sum from scratch length " FMT_SIZE "",
238  weaksum_count(&job->weak_sum));
239  } else {
240  /* set the match_len to the weak_sum count */
241  *match_len = weaksum_count(&job->weak_sum);
242  }
243  *match_pos =
244  rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum),
245  job->scoop_next + job->scoop_pos, *match_len);
246  return *match_pos != -1;
247 }
248 
249 /** Append a match at match_pos of length match_len to the delta, extending a
250  * previous match if possible, or flushing any previous miss/match. */
251 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
252  size_t match_len)
253 {
254  rs_result result = RS_DONE;
255 
256  /* if last was a match that can be extended, extend it */
257  if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
258  job->basis_len += match_len;
259  } else {
260  /* else appendflush the last value */
261  result = rs_appendflush(job);
262  /* make this the new match value */
263  job->basis_pos = match_pos;
264  job->basis_len = match_len;
265  }
266  /* increment scoop_pos to point at next unscanned data */
267  job->scoop_pos += match_len;
268  /* we can only process from the scoop if output is not blocked */
269  if (result == RS_DONE) {
270  /* process the match data off the scoop */
271  result = rs_processmatch(job);
272  }
273  return result;
274 }
275 
276 /** Append a miss of length miss_len to the delta, extending a previous miss
277  * if possible, or flushing any previous match.
278  *
279  * This also breaks misses up into 32KB segments to avoid accumulating too much
280  * in memory. */
281 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
282 {
283  const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */
284  rs_result result = RS_DONE;
285 
286  /* If last was a match, or max_miss misses, appendflush it. */
287  if (job->basis_len || (job->scoop_pos >= max_miss)) {
288  result = rs_appendflush(job);
289  }
290  /* increment scoop_pos */
291  job->scoop_pos += miss_len;
292  return result;
293 }
294 
295 /** Flush any accumulating hit or miss, appending it to the delta. */
296 static inline rs_result rs_appendflush(rs_job_t *job)
297 {
298  /* if last is a match, emit it and reset last by resetting basis_len */
299  if (job->basis_len) {
300  rs_trace("matched " FMT_LONG " bytes at " FMT_LONG "!", job->basis_len,
301  job->basis_pos);
302  rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
303  job->basis_len = 0;
304  return rs_processmatch(job);
305  /* else if last is a miss, emit and process it */
306  } else if (job->scoop_pos) {
307  rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos);
308  rs_emit_literal_cmd(job, (int)job->scoop_pos);
309  return rs_processmiss(job);
310  }
311  /* otherwise, nothing to flush so we are done */
312  return RS_DONE;
313 }
314 
315 /** Process matching data in the scoop.
316  *
317  * The scoop contains match data at scoop_next of length scoop_pos. This
318  * function processes that match data, returning RS_DONE if it completes, or
319  * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
320  * still point at the next unscanned data.
321  *
322  * This function currently just removes data from the scoop and adjusts
323  * scoop_pos appropriately. In the future this could be used for something like
324  * context compressing of miss data. Note that it also calls rs_tube_catchup to
325  * output any pending output. */
326 static inline rs_result rs_processmatch(rs_job_t *job)
327 {
328  job->scoop_avail -= job->scoop_pos;
329  job->scoop_next += job->scoop_pos;
330  job->scoop_pos = 0;
331  return rs_tube_catchup(job);
332 }
333 
334 /** Process miss data in the scoop.
335  *
336  * The scoop contains miss data at scoop_next of length scoop_pos. This
337  * function processes that miss data, returning RS_DONE if it completes, or
338  * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
339  * still point at the next unscanned data.
340  *
341  * This function uses rs_tube_copy to queue copying from the scoop into output.
342  * and uses rs_tube_catchup to do the copying. This automaticly removes data
343  * from the scoop, but this can block. While rs_tube_catchup is blocked,
344  * scoop_pos does not point at legit data, so scanning can also not proceed.
345  *
346  * In the future this could do compression of miss data before outputing it. */
347 static inline rs_result rs_processmiss(rs_job_t *job)
348 {
349  rs_tube_copy(job, job->scoop_pos);
350  job->scoop_pos = 0;
351  return rs_tube_catchup(job);
352 }
353 
354 /** State function that does a slack delta containing only literal data to
355  * recreate the input. */
357 {
358  rs_buffers_t *const stream = job->stream;
359  size_t avail = stream->avail_in;
360 
361  if (avail) {
362  rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail);
363  rs_emit_literal_cmd(job, (int)avail);
364  rs_tube_copy(job, avail);
365  return RS_RUNNING;
366  } else if (rs_job_input_is_ending(job)) {
367  job->statefn = rs_delta_s_end;
368  return RS_RUNNING;
369  }
370  return RS_BLOCKED;
371 }
372 
373 /** State function for writing out the header of the encoding job. */
375 {
377  if (job->signature) {
378  job->statefn = rs_delta_s_scan;
379  } else {
380  rs_trace("no signature provided for delta, using slack deltas");
381  job->statefn = rs_delta_s_slack;
382  }
383  return RS_RUNNING;
384 }
385 
387 {
388  rs_job_t *job;
389 
390  job = rs_job_new("delta", rs_delta_s_header);
391  /* Caller can pass NULL sig or empty sig for "slack deltas". */
392  if (sig && sig->count > 0) {
393  rs_signature_check(sig);
394  /* Caller must have called rs_build_hash_table() by now. */
395  assert(sig->hashtable);
396  job->signature = sig;
397  weaksum_init(&job->weak_sum, rs_signature_weaksum_kind(sig));
398  }
399  return job;
400 }
static int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len)
find a match at scoop_pos, returning the match_pos and match_len.
Definition: delta.c:222
static rs_result rs_processmiss(rs_job_t *job)
Process miss data in the scoop.
Definition: delta.c:347
static rs_result rs_processmatch(rs_job_t *job)
Process matching data in the scoop.
Definition: delta.c:326
rs_job_t * rs_delta_begin(rs_signature_t *sig)
Prepare to compute a streaming delta.
Definition: delta.c:386
static rs_result rs_delta_s_scan(rs_job_t *job)
Get a block of data if possible, and see if it matches.
Definition: delta.c:119
static rs_result rs_delta_s_slack(rs_job_t *job)
State function that does a slack delta containing only literal data to recreate the input.
Definition: delta.c:356
static rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
Append a miss of length miss_len to the delta, extending a previous miss if possible,...
Definition: delta.c:281
static rs_result rs_appendflush(rs_job_t *job)
Flush any accumulating hit or miss, appending it to the delta.
Definition: delta.c:296
static rs_result rs_delta_s_header(rs_job_t *job)
State function for writing out the header of the encoding job.
Definition: delta.c:374
static rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
Append a match at match_pos of length match_len to the delta, extending a previous match if possible,...
Definition: delta.c:251
void rs_emit_end_cmd(rs_job_t *job)
Write an END command.
Definition: emit.c:120
void rs_emit_literal_cmd(rs_job_t *job, int len)
Write a LITERAL command.
Definition: emit.c:47
void rs_emit_copy_cmd(rs_job_t *job, rs_long_t where, rs_long_t len)
Write a COPY command for given offset and length.
Definition: emit.c:80
void rs_emit_delta_header(rs_job_t *job)
Write the magic for the start of a delta.
Definition: emit.c:40
How to emit commands to the client.
Public header for librsync.
rs_result
Return codes from nonblocking rsync operations.
Definition: librsync.h:180
@ RS_RUNNING
The job is still running, and not yet finished or blocked.
Definition: librsync.h:183
@ RS_DONE
Completed successfully.
Definition: librsync.h:181
@ RS_BLOCKED
Blocked waiting for more data.
Definition: librsync.h:182
size_t rs_scoop_total_avail(rs_job_t *job)
Return the total number of bytes available including the scoop and input buffer.
Definition: scoop.c:227
void rs_scoop_input(rs_job_t *job, size_t len)
Try to accept a from the input buffer to get LEN bytes in the scoop.
Definition: scoop.c:67
Manage librsync streams of IO.
rs_result rs_tube_catchup(rs_job_t *job)
Put whatever will fit from the tube into the output of the stream.
Definition: tube.c:159
void rs_tube_copy(rs_job_t *job, size_t len)
Queue up a request to copy through len bytes from the input to the output of the stream.
Definition: tube.c:200
Description of input and output buffers.
Definition: librsync.h:322
size_t avail_in
Number of bytes available at next_in.
Definition: librsync.h:336
int eof_in
True if there is no more data after this.
Definition: librsync.h:339
The contents of this structure are private.
Definition: job.h:26
rs_long_t basis_pos
Copy from the basis position.
Definition: job.h:92
rs_result(* statefn)(rs_job_t *)
Callback for each processing step.
Definition: job.h:35
rs_signature_t * signature
Pointer to the signature that's being used by the operation.
Definition: job.h:51
weaksum_t weak_sum
The rollsum weak signature accumulator used by delta.c.
Definition: job.h:63
Signature of a whole file.
Definition: sumset.h:37
int count
Total number of blocks.
Definition: sumset.h:41
hashtable_t * hashtable
The hashtable for finding matches.
Definition: sumset.h:44
int block_len
The block length.
Definition: sumset.h:39
logging functions.