librsync  2.0.1
delta.c
1 /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  *
3  * librsync -- library for network deltas
4  *
5  * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net>
6  * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au>
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published by
10  * the Free Software Foundation; either version 2.1 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22 
23  /*
24  | Let's climb to the TOP of that
25  | MOUNTAIN and think about STRIP
26  | MINING!!
27  */
28 
29 
30 /*
31  * delta.c -- Generate in streaming mode an rsync delta given a set of
32  * signatures, and a new file.
33  *
34  * The size of blocks for signature generation is determined by the
35  * block size in the incoming signature.
36  *
37  * To calculate a signature, we need to be able to see at least one
38  * block of the new file at a time. Once we have that, we calculate
39  * its weak signature, and see if there is any block in the signature
40  * hash table that has the same weak sum. If there is one, then we
41  * also compute the strong sum of the new block, and cross check that.
42  * If they're the same, then we can assume we have a match.
43  *
44  * The final block of the file has to be handled a little differently,
45  * because it may be a short match. Short blocks in the signature
46  * don't include their length -- we just allow for the final short
47  * block of the file to match any block in the signature, and if they
48  * have the same checksum we assume they must have the same length.
49  * Therefore, when we emit a COPY command, we have to send it with a
50  * length that is the same as the block matched, and not the block
51  * length from the signature.
52  */
53 
54 /*
55  * Profiling results as of v1.26, 2001-03-18:
56  *
57  * If everything matches, then we spend almost all our time in
58  * rs_mdfour64 and rs_weak_sum, which is unavoidable and therefore a
59  * good profile.
60  *
61  * If nothing matches, it is not so good.
62  */
63 
64 
65 #include "config.h"
66 
67 #include <assert.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 
71 #include "librsync.h"
72 #include "emit.h"
73 #include "stream.h"
74 #include "util.h"
75 #include "sumset.h"
76 #include "job.h"
77 #include "trace.h"
78 #include "search.h"
79 #include "rollsum.h"
80 
81 /**
82  * 2002-06-26: Donovan Baarda
83  *
84  * The following is based entirely on pysync. It is much cleaner than the
85  * previous incarnation of this code. It is slightly complicated because in
86  * this case the output can block, so the main delta loop needs to stop
87  * when this happens.
88  *
89  * In pysync a 'last' attribute is used to hold the last miss or match for
90  * extending if possible. In this code, basis_len and scoop_pos are used
91  * instead of 'last'. When basis_len > 0, last is a match. When basis_len =
92  * 0 and scoop_pos is > 0, last is a miss. When both are 0, last is None
93  * (ie, nothing).
94  *
95  * Pysync is also slightly different in that a 'flush' method is available
96  * to force output of accumulated data. This 'flush' is use to finalise
97  * delta calculation. In librsync input is terminated with an eof flag on
98  * the input stream. I have structured this code similar to pysync with a
99  * seperate flush function that is used when eof is reached. This allows
100  * for a flush style API if one is ever needed. Note that flush in pysync
101  * can be used for more than just terminating delta calculation, so a flush
102  * based API can in some ways be more flexible...
103  *
104  * The input data is first scanned, then processed. Scanning identifies
105  * input data as misses or matches, and emits the instruction stream.
106  * Processing the data consumes it off the input scoop and outputs the
107  * processed miss data into the tube.
108  *
109  * The scoop contains all data yet to be processed. The scoop_pos is an
110  * index into the scoop that indicates the point scanned to. As data is
111  * scanned, scoop_pos is incremented. As data is processed, it is removed
112  * from the scoop and scoop_pos adjusted. Everything gets complicated
113  * because the tube can block. When the tube is blocked, no data can be
114  * processed.
115  *
116  */
117 
118 /* used by rdiff, but now redundant */
119 int rs_roll_paranoia = 0;
120 
121 static rs_result rs_delta_s_scan(rs_job_t *job);
122 static rs_result rs_delta_s_flush(rs_job_t *job);
123 static rs_result rs_delta_s_end(rs_job_t *job);
124 void rs_getinput(rs_job_t *job);
125 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len);
126 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len);
127 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
128 static inline rs_result rs_appendflush(rs_job_t *job);
129 static inline rs_result rs_processmatch(rs_job_t *job);
130 static inline rs_result rs_processmiss(rs_job_t *job);
131 
132 /**
133  * \brief Get a block of data if possible, and see if it matches.
134  *
135  * On each call, we try to process all of the input data available on the
136  * scoop and input buffer. */
137 static rs_result rs_delta_s_scan(rs_job_t *job)
138 {
139  const size_t block_len = job->signature->block_len;
140  rs_long_t match_pos;
141  size_t match_len;
142  rs_result result;
143  Rollsum test;
144 
145  rs_job_check(job);
146  /* read the input into the scoop */
147  rs_getinput(job);
148  /* output any pending output from the tube */
149  result=rs_tube_catchup(job);
150  /* while output is not blocked and there is a block of data */
151  while ((result==RS_DONE) &&
152  ((job->scoop_pos + block_len) < job->scoop_avail)) {
153  /* check if this block matches */
154  if (rs_findmatch(job,&match_pos,&match_len)) {
155  /* append the match and reset the weak_sum */
156  result=rs_appendmatch(job,match_pos,match_len);
157  RollsumInit(&job->weak_sum);
158  } else {
159  /* rotate the weak_sum and append the miss byte */
160  RollsumRotate(&job->weak_sum, job->scoop_next[job->scoop_pos],
161  job->scoop_next[job->scoop_pos + block_len]);
162  result=rs_appendmiss(job,1);
163  if (rs_roll_paranoia) {
164  RollsumInit(&test);
165  RollsumUpdate(&test, job->scoop_next + job->scoop_pos, block_len);
166  if (RollsumDigest(&test) != RollsumDigest(&job->weak_sum)) {
167  rs_fatal("mismatch between rolled sum %#x and check %#x",
168  (int)RollsumDigest(&job->weak_sum),
169  (int)RollsumDigest(&test));
170  }
171 
172  }
173  }
174  }
175  /* if we completed OK */
176  if (result==RS_DONE) {
177  /* if we reached eof, we can flush the last fragment */
178  if (job->stream->eof_in) {
179  job->statefn=rs_delta_s_flush;
180  return RS_RUNNING;
181  } else {
182  /* we are blocked waiting for more data */
183  return RS_BLOCKED;
184  }
185  }
186  return result;
187 }
188 
189 
190 static rs_result rs_delta_s_flush(rs_job_t *job)
191 {
192  rs_long_t match_pos;
193  size_t match_len;
194  rs_result result;
195 
196  rs_job_check(job);
197  /* read the input into the scoop */
198  rs_getinput(job);
199  /* output any pending output */
200  result=rs_tube_catchup(job);
201  /* while output is not blocked and there is any remaining data */
202  while ((result==RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
203  /* check if this block matches */
204  if (rs_findmatch(job,&match_pos,&match_len)) {
205  /* append the match and reset the weak_sum */
206  result=rs_appendmatch(job,match_pos,match_len);
207  RollsumInit(&job->weak_sum);
208  } else {
209  /* rollout from weak_sum and append the miss byte */
210  RollsumRollout(&job->weak_sum,job->scoop_next[job->scoop_pos]);
211  rs_trace("block reduced to %d", (int)job->weak_sum.count);
212  result=rs_appendmiss(job,1);
213  }
214  }
215  /* if we are not blocked, flush and set end statefn. */
216  if (result==RS_DONE) {
217  result=rs_appendflush(job);
218  job->statefn=rs_delta_s_end;
219  }
220  if (result==RS_DONE) {
221  return RS_RUNNING;
222  }
223  return result;
224 }
225 
226 
227 static rs_result rs_delta_s_end(rs_job_t *job)
228 {
229  rs_emit_end_cmd(job);
230  return RS_DONE;
231 }
232 
233 
234 void rs_getinput(rs_job_t *job) {
235  size_t len;
236 
237  len=rs_scoop_total_avail(job);
238  if (job->scoop_avail < len) {
239  rs_scoop_input(job,len);
240  }
241 }
242 
243 
244 /**
245  * find a match at scoop_pos, returning the match_pos and match_len.
246  * Note that this will calculate weak_sum if required. It will also
247  * determine the match_len.
248  *
249  * Note that this routine could be modified to do xdelta style matches that
250  * would extend matches past block boundaries by matching backwards and
251  * forwards beyond the block boundaries. Extending backwards would require
252  * decrementing scoop_pos as appropriate.
253  */
254 inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len) {
255  const size_t block_len = job->signature->block_len;
256 
257  /* calculate the weak_sum if we don't have one */
258  if (job->weak_sum.count == 0) {
259  /* set match_len to min(block_len, scan_avail) */
260  *match_len=job->scoop_avail - job->scoop_pos;
261  if (*match_len > block_len) {
262  *match_len = block_len;
263  }
264  /* Update the weak_sum */
265  RollsumUpdate(&job->weak_sum,job->scoop_next+job->scoop_pos,*match_len);
266  rs_trace("calculate weak sum from scratch length %d",(int)job->weak_sum.count);
267  } else {
268  /* set the match_len to the weak_sum count */
269  *match_len=job->weak_sum.count;
270  }
271  *match_pos = rs_signature_find_match(job->signature,
272  RollsumDigest(&job->weak_sum),
273  job->scoop_next+job->scoop_pos,
274  *match_len);
275  return *match_pos != -1;
276 }
277 
278 
279 /**
280  * Append a match at match_pos of length match_len to the delta, extending
281  * a previous match if possible, or flushing any previous miss/match. */
282 inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
283 {
284  rs_result result=RS_DONE;
285 
286  /* if last was a match that can be extended, extend it */
287  if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
288  job->basis_len+=match_len;
289  } else {
290  /* else appendflush the last value */
291  result=rs_appendflush(job);
292  /* make this the new match value */
293  job->basis_pos=match_pos;
294  job->basis_len=match_len;
295  }
296  /* increment scoop_pos to point at next unscanned data */
297  job->scoop_pos+=match_len;
298  /* we can only process from the scoop if output is not blocked */
299  if (result==RS_DONE) {
300  /* process the match data off the scoop*/
301  result=rs_processmatch(job);
302  }
303  return result;
304 }
305 
306 
307 /**
308  * Append a miss of length miss_len to the delta, extending a previous miss
309  * if possible, or flushing any previous match.
310  *
311  * This also breaks misses up into rs_outbuflen segments to avoid accumulating
312  * too much in memory. */
313 inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
314 {
315  rs_result result=RS_DONE;
316 
317  /* If last was a match, or rs_outbuflen misses, appendflush it. */
318  if (job->basis_len || (job->scoop_pos >= rs_outbuflen)) {
319  result=rs_appendflush(job);
320  }
321  /* increment scoop_pos */
322  job->scoop_pos+=miss_len;
323  return result;
324 }
325 
326 
327 /**
328  * Flush any accumulating hit or miss, appending it to the delta.
329  */
330 inline rs_result rs_appendflush(rs_job_t *job)
331 {
332  /* if last is a match, emit it and reset last by resetting basis_len */
333  if (job->basis_len) {
334  rs_trace("matched " PRINTF_FORMAT_U64 " bytes at " PRINTF_FORMAT_U64 "!",
335  PRINTF_CAST_U64(job->basis_len),
336  PRINTF_CAST_U64(job->basis_pos));
337  rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
338  job->basis_len=0;
339  return rs_processmatch(job);
340  /* else if last is a miss, emit and process it*/
341  } else if (job->scoop_pos) {
342  rs_trace("got %ld bytes of literal data", (long) job->scoop_pos);
343  rs_emit_literal_cmd(job, job->scoop_pos);
344  return rs_processmiss(job);
345  }
346  /* otherwise, nothing to flush so we are done */
347  return RS_DONE;
348 }
349 
350 
351 /**
352  * The scoop contains match data at scoop_next of length scoop_pos. This
353  * function processes that match data, returning RS_DONE if it completes,
354  * or RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset
355  * to still point at the next unscanned data.
356  *
357  * This function currently just removes data from the scoop and adjusts
358  * scoop_pos appropriately. In the future this could be used for something
359  * like context compressing of miss data. Note that it also calls
360  * rs_tube_catchup to output any pending output. */
361 inline rs_result rs_processmatch(rs_job_t *job)
362 {
363  job->scoop_avail-=job->scoop_pos;
364  job->scoop_next+=job->scoop_pos;
365  job->scoop_pos=0;
366  return rs_tube_catchup(job);
367 }
368 
369 /**
370  * The scoop contains miss data at scoop_next of length scoop_pos. This
371  * function processes that miss data, returning RS_DONE if it completes, or
372  * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
373  * still point at the next unscanned data.
374  *
375  * This function uses rs_tube_copy to queue copying from the scoop into
376  * output. and uses rs_tube_catchup to do the copying. This automaticly
377  * removes data from the scoop, but this can block. While rs_tube_catchup
378  * is blocked, scoop_pos does not point at legit data, so scanning can also
379  * not proceed.
380  *
381  * In the future this could do compression of miss data before outputing
382  * it. */
383 inline rs_result rs_processmiss(rs_job_t *job)
384 {
385  rs_tube_copy(job, job->scoop_pos);
386  job->scoop_pos=0;
387  return rs_tube_catchup(job);
388 }
389 
390 
391 /**
392  * \brief State function that does a slack delta containing only
393  * literal data to recreate the input.
394  */
395 static rs_result rs_delta_s_slack(rs_job_t *job)
396 {
397  rs_buffers_t * const stream = job->stream;
398  size_t avail = stream->avail_in;
399 
400  if (avail) {
401  rs_trace("emit slack delta for " PRINTF_FORMAT_U64
402  " available bytes", PRINTF_CAST_U64(avail));
403  rs_emit_literal_cmd(job, avail);
404  rs_tube_copy(job, avail);
405  return RS_RUNNING;
406  } else if (rs_job_input_is_ending(job)) {
407  job->statefn = rs_delta_s_end;
408  return RS_RUNNING;
409  }
410  return RS_BLOCKED;
411 }
412 
413 
414 /**
415  * State function for writing out the header of the encoding job.
416  */
417 static rs_result rs_delta_s_header(rs_job_t *job)
418 {
419  rs_emit_delta_header(job);
420  if (job->signature) {
421  job->statefn = rs_delta_s_scan;
422  } else {
423  rs_trace("no signature provided for delta, using slack deltas");
424  job->statefn = rs_delta_s_slack;
425  }
426  return RS_RUNNING;
427 }
428 
429 
431 {
432  rs_job_t *job;
433 
434  job = rs_job_new("delta", rs_delta_s_header);
435  /* Caller can pass NULL sig for "slack deltas". */
436  if (sig) {
437  rs_signature_check(sig);
438  /* Caller must have called rs_build_hash_table() by now. */
439  assert(sig->hashtable);
440  job->signature = sig;
441  RollsumInit(&job->weak_sum);
442  }
443  return job;
444 }
hashtable_t * hashtable
The hashtable for finding matches.
Definition: sumset.h:46
Description of input and output buffers.
Definition: librsync.h:361
int block_len
The block length.
Definition: sumset.h:41
rs_signature_t * signature
Pointer to the signature that&#39;s being used by the operation.
Definition: job.h:54
long long rs_long_t
A long integer type that can handle the largest file offsets.
size_t avail_in
Number of bytes available at next_in References the length of available input.
Definition: librsync.h:376
rs_long_t basis_pos
Copy from the basis position.
Definition: job.h:98
rs_result(* statefn)(rs_job_t *)
Callback for each processing step.
Definition: job.h:38
Public header for librsync.
Signature of a whole file.
Definition: sumset.h:39
Rollsum weak_sum
The rollsum weak signature accumulator used by delta.c.
Definition: job.h:66
rs_result
Return codes from nonblocking rsync operations.
Definition: librsync.h:193
rs_job_t * rs_delta_begin(rs_signature_t *)
Prepare to compute a streaming delta.
Definition: delta.c:430
Blocked waiting for more data.
Definition: librsync.h:195
The job is still running, and not yet finished or blocked.
Definition: librsync.h:199
int eof_in
True if there is no more data after this.
Definition: librsync.h:381
Completed successfully.
Definition: librsync.h:194
The contents of this structure are private.
Definition: job.h:29