librsync  2.3.4
delta.c
Go to the documentation of this file.
1/*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2 *
3 * librsync -- library for network deltas
4 *
5 * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net>
6 * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au>
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation; either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /*=
24 | Let's climb to the TOP of that
25 | MOUNTAIN and think about STRIP
26 | MINING!!
27 */
28
29/** \file delta.c
30 * Generate in streaming mode an rsync delta given a set of signatures, and a
31 * new file.
32 *
33 * The size of blocks for signature generation is determined by the block size
34 * in the incoming signature.
35 *
36 * To calculate a signature, we need to be able to see at least one block of
37 * the new file at a time. Once we have that, we calculate its weak signature,
38 * and see if there is any block in the signature hash table that has the same
39 * weak sum. If there is one, then we also compute the strong sum of the new
40 * block, and cross check that. If they're the same, then we can assume we have
41 * a match.
42 *
43 * The final block of the file has to be handled a little differently, because
44 * it may be a short match. Short blocks in the signature don't include their
45 * length -- we just allow for the final short block of the file to match any
46 * block in the signature, and if they have the same checksum we assume they
47 * must have the same length. Therefore, when we emit a COPY command, we have
48 * to send it with a length that is the same as the block matched, and not the
49 * block length from the signature.
50 *
51 * Profiling results as of v1.26, 2001-03-18:
52 *
53 * If everything matches, then we spend almost all our time in rs_mdfour64 and
54 * rs_weak_sum, which is unavoidable and therefore a good profile.
55 *
56 * If nothing matches, it is not so good.
57 *
58 * 2002-06-26: Donovan Baarda
59 *
60 * The following is based entirely on pysync. It is much cleaner than the
61 * previous incarnation of this code. It is slightly complicated because in
62 * this case the output can block, so the main delta loop needs to stop when
63 * this happens.
64 *
65 * In pysync a 'last' attribute is used to hold the last miss or match for
66 * extending if possible. In this code, basis_len and scan_pos are used instead
67 * of 'last'. When basis_len > 0, last is a match. When basis_len = 0 and
68 * scan_pos is > 0, last is a miss. When both are 0, last is None (ie,
69 * nothing).
70 *
71 * Pysync is also slightly different in that a 'flush' method is available to
72 * force output of accumulated data. This 'flush' is use to finalise delta
73 * calculation. In librsync input is terminated with an eof flag on the input
74 * stream. I have structured this code similar to pysync with a seperate flush
75 * function that is used when eof is reached. This allows for a flush style API
76 * if one is ever needed. Note that flush in pysync can be used for more than
77 * just terminating delta calculation, so a flush based API can in some ways be
78 * more flexible...
79 *
80 * The input data is first scanned, then processed. Scanning identifies input
81 * data as misses or matches, and emits the instruction stream. Processing the
82 * data consumes it off the input scoop and outputs the processed miss data
83 * into the tube.
84 *
85 * The scoop contains all data yet to be processed. The scan_pos is an index
86 * into the scoop that indicates the point scanned to. As data is scanned,
87 * scan_pos is incremented. As data is processed, it is removed from the scoop
88 * and scan_pos adjusted. Everything gets complicated because the tube can
89 * block. When the tube is blocked, no data can be processed. */
90
91#include "config.h" /* IWYU pragma: keep */
92#include <assert.h>
93#include <stdlib.h>
94#include "librsync.h"
95#include "job.h"
96#include "sumset.h"
97#include "checksum.h"
98#include "scoop.h"
99#include "emit.h"
100#include "trace.h"
101
102/** Max length of a miss is 64K including 3 command bytes. */
103#define MAX_MISS_LEN (MAX_DELTA_CMD - 3)
104
106static rs_result rs_delta_s_flush(rs_job_t *job);
107static rs_result rs_delta_s_end(rs_job_t *job);
108static inline rs_result rs_getinput(rs_job_t *job, size_t block_len);
109static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
110 size_t *match_len);
111static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
112 size_t match_len);
113static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
114static inline rs_result rs_appendflush(rs_job_t *job);
115static inline rs_result rs_processmatch(rs_job_t *job);
116static inline rs_result rs_processmiss(rs_job_t *job);
117
118/** Get a block of data if possible, and see if it matches.
119 *
120 * On each call, we try to process all of the input data available on the scoop
121 * and input buffer. */
123{
124 const size_t block_len = job->signature->block_len;
125 rs_long_t match_pos;
126 size_t match_len;
127 rs_result result;
128
129 rs_job_check(job);
130 /* output any pending output from the tube */
131 if ((result = rs_tube_catchup(job)) != RS_DONE)
132 return result;
133 /* read the input into the scoop */
134 if ((result = rs_getinput(job, block_len)) != RS_DONE)
135 return result;
136 /* while output is not blocked and there is a block of data */
137 while ((result == RS_DONE) && ((job->scan_pos + block_len) < job->scan_len)) {
138 /* check if this block matches */
139 if (rs_findmatch(job, &match_pos, &match_len)) {
140 /* append the match and reset the weak_sum */
141 result = rs_appendmatch(job, match_pos, match_len);
142 weaksum_reset(&job->weak_sum);
143 } else {
144 /* rotate the weak_sum and append the miss byte */
145 weaksum_rotate(&job->weak_sum, job->scan_buf[job->scan_pos],
146 job->scan_buf[job->scan_pos + block_len]);
147 result = rs_appendmiss(job, 1);
148 }
149 }
150 /* if we completed OK */
151 if (result == RS_DONE) {
152 /* if we reached eof, we can flush the last fragment */
153 if (job->stream->eof_in) {
154 job->statefn = rs_delta_s_flush;
155 return RS_RUNNING;
156 } else {
157 /* we are blocked waiting for more data */
158 return RS_BLOCKED;
159 }
160 }
161 return result;
162}
163
164static rs_result rs_delta_s_flush(rs_job_t *job)
165{
166 const size_t block_len = job->signature->block_len;
167 rs_long_t match_pos;
168 size_t match_len;
169 rs_result result;
170
171 rs_job_check(job);
172 /* output any pending output from the tube */
173 if ((result = rs_tube_catchup(job)) != RS_DONE)
174 return result;
175 /* read the input into the scoop */
176 if ((result = rs_getinput(job, block_len)) != RS_DONE)
177 return result;
178 /* while output is not blocked and there is any remaining data */
179 while ((result == RS_DONE) && (job->scan_pos < job->scan_len)) {
180 /* check if this block matches */
181 if (rs_findmatch(job, &match_pos, &match_len)) {
182 /* append the match and reset the weak_sum */
183 result = rs_appendmatch(job, match_pos, match_len);
184 weaksum_reset(&job->weak_sum);
185 } else {
186 /* rollout from weak_sum and append the miss byte */
187 weaksum_rollout(&job->weak_sum, job->scan_buf[job->scan_pos]);
188 rs_trace("block reduced to " FMT_SIZE "",
189 weaksum_count(&job->weak_sum));
190 result = rs_appendmiss(job, 1);
191 }
192 }
193 /* if we are not blocked, flush and set end statefn. */
194 if (result == RS_DONE) {
195 result = rs_appendflush(job);
196 job->statefn = rs_delta_s_end;
197 }
198 if (result == RS_DONE) {
199 return RS_RUNNING;
200 }
201 return result;
202}
203
204static rs_result rs_delta_s_end(rs_job_t *job)
205{
206 rs_emit_end_cmd(job);
207 return RS_DONE;
208}
209
210static inline rs_result rs_getinput(rs_job_t *job, size_t block_len)
211{
212 size_t min_len = block_len + MAX_DELTA_CMD;
213
214 job->scan_len = rs_scoop_avail(job);
215 if (job->scan_len < min_len && !job->stream->eof_in)
216 job->scan_len = min_len;
217 return rs_scoop_readahead(job, job->scan_len, (void **)&job->scan_buf);
218}
219
220/** find a match at scan_pos, returning the match_pos and match_len.
221 *
222 * Note that this will calculate weak_sum if required. It will also determine
223 * the match_len.
224 *
225 * This routine could be modified to do xdelta style matches that would extend
226 * matches past block boundaries by matching backwards and forwards beyond the
227 * block boundaries. Extending backwards would require decrementing scan_pos as
228 * appropriate. */
229static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
230 size_t *match_len)
231{
232 const size_t block_len = job->signature->block_len;
233
234 /* calculate the weak_sum if we don't have one */
235 if (weaksum_count(&job->weak_sum) == 0) {
236 /* set match_len to min(block_len, scan_avail) */
237 *match_len = job->scan_len - job->scan_pos;
238 if (*match_len > block_len) {
239 *match_len = block_len;
240 }
241 /* Update the weak_sum */
242 weaksum_update(&job->weak_sum, job->scan_buf + job->scan_pos,
243 *match_len);
244 rs_trace("calculate weak sum from scratch length " FMT_SIZE "",
245 weaksum_count(&job->weak_sum));
246 } else {
247 /* set the match_len to the weak_sum count */
248 *match_len = weaksum_count(&job->weak_sum);
249 }
250 *match_pos =
251 rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum),
252 job->scan_buf + job->scan_pos, *match_len);
253 return *match_pos != -1;
254}
255
256/** Append a match at match_pos of length match_len to the delta, extending a
257 * previous match if possible, or flushing any previous miss/match. */
258static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
259 size_t match_len)
260{
261 rs_result result = RS_DONE;
262
263 /* if last was a match that can be extended, extend it */
264 if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
265 job->basis_len += match_len;
266 } else {
267 /* else appendflush the last value */
268 result = rs_appendflush(job);
269 /* make this the new match value */
270 job->basis_pos = match_pos;
271 job->basis_len = match_len;
272 }
273 /* increment scan_pos to point at next unscanned data */
274 job->scan_pos += match_len;
275 /* we can only process from the scoop if output is not blocked */
276 if (result == RS_DONE) {
277 /* process the match data off the scoop */
278 result = rs_processmatch(job);
279 }
280 return result;
281}
282
283/** Append a miss of length miss_len to the delta, extending a previous miss
284 * if possible, or flushing any previous match.
285 *
286 * This also breaks misses up into 32KB segments to avoid accumulating too much
287 * in memory. */
288static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
289{
290 rs_result result = RS_DONE;
291
292 /* If last was a match, or MAX_MISS_LEN misses, appendflush it. */
293 if (job->basis_len || (job->scan_pos >= MAX_MISS_LEN)) {
294 result = rs_appendflush(job);
295 }
296 /* increment scan_pos */
297 job->scan_pos += miss_len;
298 return result;
299}
300
301/** Flush any accumulating hit or miss, appending it to the delta. */
303{
304 /* if last is a match, emit it and reset last by resetting basis_len */
305 if (job->basis_len) {
306 rs_trace("matched " FMT_LONG " bytes at " FMT_LONG "!", job->basis_len,
307 job->basis_pos);
308 rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
309 job->basis_len = 0;
310 return rs_processmatch(job);
311 /* else if last is a miss, emit and process it */
312 } else if (job->scan_pos) {
313 rs_trace("got " FMT_SIZE " bytes of literal data", job->scan_pos);
314 rs_emit_literal_cmd(job, (int)job->scan_pos);
315 return rs_processmiss(job);
316 }
317 /* otherwise, nothing to flush so we are done */
318 return RS_DONE;
319}
320
321/** Process matching data in the scoop.
322 *
323 * The scoop contains match data at scan_buf of length scan_pos. This function
324 * processes that match data, returning RS_DONE if it completes, or RS_BLOCKED
325 * if it gets blocked. After it completes scan_pos is reset to still point at
326 * the next unscanned data.
327 *
328 * This function currently just removes data from the scoop and adjusts
329 * scan_pos appropriately. In the future this could be used for something like
330 * context compressing of miss data. Note that it also calls rs_tube_catchup to
331 * output any pending output. */
333{
334 assert(job->copy_len == 0);
335 rs_scoop_advance(job, job->scan_pos);
336 job->scan_buf += job->scan_pos;
337 job->scan_len -= job->scan_pos;
338 job->scan_pos = 0;
339 return rs_tube_catchup(job);
340}
341
342/** Process miss data in the scoop.
343 *
344 * The scoop contains miss data at scan_buf of length scan_pos. This function
345 * processes that miss data, returning RS_DONE if it completes, or RS_BLOCKED
346 * if it gets blocked. After it completes scan_pos is reset to still point at
347 * the next unscanned data.
348 *
349 * This function uses rs_tube_copy to queue copying from the scoop into output.
350 * and uses rs_tube_catchup to do the copying. This automaticly removes data
351 * from the scoop, but this can block. While rs_tube_catchup is blocked,
352 * scan_pos does not point at legit data, so scanning can also not proceed.
353 *
354 * In the future this could do compression of miss data before outputing it. */
356{
357 assert(job->write_len > 0);
358 rs_tube_copy(job, job->scan_pos);
359 job->scan_buf += job->scan_pos;
360 job->scan_len -= job->scan_pos;
361 job->scan_pos = 0;
362 return rs_tube_catchup(job);
363}
364
365/** State function that does a slack delta containing only literal data to
366 * recreate the input. */
368{
369 size_t avail = rs_scoop_avail(job);
370
371 if (avail) {
372 rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail);
373 rs_emit_literal_cmd(job, (int)avail);
374 rs_tube_copy(job, avail);
375 return RS_RUNNING;
376 } else if (rs_scoop_eof(job)) {
377 job->statefn = rs_delta_s_end;
378 return RS_RUNNING;
379 }
380 return RS_BLOCKED;
381}
382
383/** State function for writing out the header of the encoding job. */
385{
387 if (job->signature) {
389 } else {
390 rs_trace("no signature provided for delta, using slack deltas");
392 }
393 return RS_RUNNING;
394}
395
397{
398 rs_job_t *job;
399
400 job = rs_job_new("delta", rs_delta_s_header);
401 /* Caller can pass NULL sig or empty sig for "slack deltas". */
402 if (sig && sig->count > 0) {
404 /* Caller must have called rs_build_hash_table() by now. */
405 assert(sig->hashtable);
406 job->signature = sig;
407 weaksum_init(&job->weak_sum, rs_signature_weaksum_kind(sig));
408 }
409 return job;
410}
Abstract wrappers around different weaksum and strongsum implementations.
static int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len)
find a match at scan_pos, returning the match_pos and match_len.
Definition: delta.c:229
static rs_result rs_processmiss(rs_job_t *job)
Process miss data in the scoop.
Definition: delta.c:355
static rs_result rs_processmatch(rs_job_t *job)
Process matching data in the scoop.
Definition: delta.c:332
rs_job_t * rs_delta_begin(rs_signature_t *sig)
Prepare to compute a streaming delta.
Definition: delta.c:396
static rs_result rs_delta_s_scan(rs_job_t *job)
Get a block of data if possible, and see if it matches.
Definition: delta.c:122
static rs_result rs_delta_s_slack(rs_job_t *job)
State function that does a slack delta containing only literal data to recreate the input.
Definition: delta.c:367
static rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
Append a miss of length miss_len to the delta, extending a previous miss if possible,...
Definition: delta.c:288
static rs_result rs_appendflush(rs_job_t *job)
Flush any accumulating hit or miss, appending it to the delta.
Definition: delta.c:302
static rs_result rs_delta_s_header(rs_job_t *job)
State function for writing out the header of the encoding job.
Definition: delta.c:384
static rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
Append a match at match_pos of length match_len to the delta, extending a previous match if possible,...
Definition: delta.c:258
#define MAX_MISS_LEN
Max length of a miss is 64K including 3 command bytes.
Definition: delta.c:103
encoding output routines.
void rs_emit_delta_header(rs_job_t *)
Write the magic for the start of a delta.
Definition: emit.c:31
void rs_emit_copy_cmd(rs_job_t *job, rs_long_t where, rs_long_t len)
Write a COPY command for given offset and length.
Definition: emit.c:66
void rs_emit_end_cmd(rs_job_t *)
Write an END command.
Definition: emit.c:105
void rs_emit_literal_cmd(rs_job_t *, int len)
Write a LITERAL command.
Definition: emit.c:37
Generic state-machine interface.
#define rs_job_check(job)
Assert that a job is valid.
Definition: job.h:130
#define MAX_DELTA_CMD
Max length of a singled delta command is including command bytes.
Definition: job.h:44
Public header for librsync.
rs_result
Return codes from nonblocking rsync operations.
Definition: librsync.h:180
@ RS_RUNNING
The job is still running, and not yet finished or blocked.
Definition: librsync.h:183
@ RS_DONE
Completed successfully.
Definition: librsync.h:181
@ RS_BLOCKED
Blocked waiting for more data.
Definition: librsync.h:182
rs_result rs_scoop_readahead(rs_job_t *job, size_t len, void **ptr)
Read from scoop without advancing.
Definition: scoop.c:148
void rs_scoop_advance(rs_job_t *job, size_t len)
Advance the input cursor forward len bytes.
Definition: scoop.c:117
Manage librsync streams of IO.
rs_result rs_tube_catchup(rs_job_t *job)
Put whatever will fit from the tube into the output of the stream.
Definition: tube.c:120
static bool rs_scoop_eof(rs_job_t *job)
Test if the scoop has reached eof.
Definition: scoop.h:100
void rs_tube_copy(rs_job_t *job, size_t len)
Queue up a request to copy through len bytes from the input to the output of the stream.
Definition: tube.c:160
int eof_in
True if there is no more data after this.
Definition: librsync.h:345
The contents of this structure are private.
Definition: job.h:47
rs_long_t basis_pos
Copy from the basis position.
Definition: job.h:117
rs_byte_t * scan_buf
The delta scan buffer, where scan_buf[scan_pos..scan_len] is the data yet to be scanned.
Definition: job.h:104
size_t copy_len
If copy_len is >0, then that much data should be copied through from the input.
Definition: job.h:114
size_t scan_pos
The delta scan position.
Definition: job.h:106
size_t scan_len
The delta scan buffer length.
Definition: job.h:105
rs_result(* statefn)(rs_job_t *)
Callback for each processing step.
Definition: job.h:56
rs_signature_t * signature
Pointer to the signature that's being used by the operation.
Definition: job.h:72
weaksum_t weak_sum
The rollsum weak signature accumulator used by delta.c.
Definition: job.h:84
Signature of a whole file.
Definition: sumset.h:44
int count
Total number of blocks.
Definition: sumset.h:48
hashtable_t * hashtable
The hashtable for finding matches.
Definition: sumset.h:51
int block_len
The block length.
Definition: sumset.h:46
The rs_signature class implementation of a file signature.
rs_long_t rs_signature_find_match(rs_signature_t *sig, rs_weak_sum_t weak_sum, void const *buf, size_t len)
Find a matching block offset in a signature.
Definition: sumset.c:242
static weaksum_kind_t rs_signature_weaksum_kind(rs_signature_t const *sig)
Get the weaksum kind for a signature.
Definition: sumset.h:114
#define rs_signature_check(sig)
Assert that a signature is valid.
Definition: sumset.h:107
logging functions.