root/branches/add_stat_caching/ftpsync-0.1/sync.py

Revision 545, 13.0 kB (checked in by schwa, 2 years ago)
Code contributed by Martin Wilck. (Thank you!) I might use some of
this code to add caching of stat results to ftputil.
  • Property svn:eol-style set to native
Line 
1 import os
2 import sys
3 from loggingclass import LoggingClass, NOTICE
4 from rsyncmatch import GlobChain, EXCLUDE
5
6 class Synchronizer(LoggingClass):
7     """
8     A class for synchronizing directories between two file systems.
9
10     Usage example:
11     sync = Synchronizer(os, os, "/source", "/target")
12     sync.sync("subdir")
13
14     This will synchronize "/source/subdir" to "/target/subdir".
15     """
16
17     class SyncAction:
18         """
19         This "class" stores actions to be carried out.
20         """
21         def __init__(self):
22             self.unl = []   # stuff to unlink
23             self.cpy = []   # stuff to copy
24             self.rmd = []   # stuff to rmdir
25             self.mkd = []   # stuff to mkdir
26             self.dsc = []   # dirs to descend into
27
28     class FileSys:
29         """
30         A helper class for Synchronizer. Another abstraction layer above
31         'os' and other filesystem access (e.g. FTP).
32         
33         It inherits most attributes from it's '_io' element (typically 'os').
34         """
35
36         def __init__(self, io, root):
37             self._io = io
38             self.root = root
39
40         def open(self, *args):
41             """
42             Open a file on the file system.
43             """
44             if self._io == os:
45                 return open(*args)
46             else:
47                 return self._io.open(*args)
48
49         def eq(self, x, y):
50             """
51             Boolean: True if file names x and y are equal by this file
52             system's rules. This refers mainly to case-sensitiveness.
53             """
54             ret = (self._io.path.normcase(x) == self._io.path.normcase(y))
55             return ret
56
57         def cmp(self, x, y):
58             """
59             Compare file names by this file system's rules.
60             """
61             ret = cmp(self._io.path.normcase(x), self._io.path.normcase(y))
62             return ret
63
64         def __getattr__(self, attr):
65             return getattr(self._io, attr)
66
67
68     def __init__(self, io_s, io_t, root_s, root_t,
69                  mode = "b", blocksize = 65536,
70                  delete=False, delete_excluded=False,
71                  dry_run=False):
72         """
73         io_s, io_t: "IO class" of the source and target, respectively.
74            typically 'os' or an ftputil.FTPHost
75         root_s, root_t: root directories for synchronization on source
76            and target, respectively.
77         mode: file open() mode (usually 'b')
78         blocksize: block size for copying (default: 64kB)
79         delete: whether to delete additional files on target (default: false)
80         delete_excluded: whether to delete files which were excluded, similar
81            to rsync's --delete-exluded option. See exclude() method.
82         dry_run: whether anything should actually be done on the target.
83         """
84
85         self.io_s = self.FileSys(io_s, root_s)
86         self.io_t = self.FileSys(io_t, root_t)
87         self.mode = mode
88         self.dry_run = dry_run
89         self.blocksize = blocksize
90         self.delete = delete
91         self.delete_excluded = delete_excluded
92         self.logger.info("options: delete=%s, delete-excluded=%s, dry-run=%s"
93                          % (self.delete, self.delete_excluded, self.dry_run))
94         return
95
96     def _rm_rf(self, path):
97         err = False
98         for f in self.io_t.listdir(path):
99             absl = self.io_t.path.join(path, f)
100             if self.isdir(self.io_t, absl):
101                 try:
102                     self._rm_rf(absl)
103                 except OSError:
104                     self.logger.exception("rmdir %s" % absl)
105                     err = sys.exc_info()[:2]
106             else:
107                 self.logger.debug("delete %s" % absl)
108                 try:
109                     if not self.dry_run:
110                         self.io_t.unlink(absl)
111                 except OSError:
112                     self.logger.exception("delete %s" % absl)
113                     err = sys.exc_info()[:2]
114         self.logger.debug("rmdir %s" % path)
115         if not self.dry_run:
116             self.io_t.rmdir(path)
117         if err:
118             raise err[0], err[1]
119        
120     def rm_rf(self, path):
121         """
122         Remove directory recursively.
123         """
124         absl=self.io_t.path.abspath(path)
125         self._rm_rf(absl)
126
127     def _pull(self, x, lst, eq):
128         """
129         x: file name
130         lst: list of file names
131         eq: function to check file name equality
132         returns: true if file was matched
133         side effects: removes all matching entries from lst
134         """
135         oldlen = len(lst)
136         i = 0
137         while i < len(lst):
138             if eq(x, lst[i]):
139                 found = True
140                 del lst[i]
141             i = i + 1
142         return (len(lst) < oldlen)
143
144     def exclude(self, dir, name, isdir):
145         """
146         (virtual): this implementation returns always False.
147         dir: parent directory
148         name: file name
149         isdir: True iff name represents a directory itself
150         returns: True if file is to be excluded.
151         """
152         return False
153
154     def need_copy(self, src, tgt):
155         """
156         src, tgt: corresponding files on source and target
157         returns: a "reason string" if src needs to be copied to tgt.
158                  the emtpy string otherwise.
159         This default implementation returns non-"" if the file
160         sizes differ ("size"), or if src is newer than tgt ("date").
161         """
162         ret = ""
163         stat_s = self.io_s.stat(src)
164         stat_t = self.io_t.stat(tgt)
165         if stat_s.st_size != stat_t.st_size:
166             ret = "size"
167             self.logger.debug("%s: sizes differ: %d %d" %
168                               (tgt, stat_s.st_size, stat_t.st_size))
169         elif (stat_s.st_mtime - stat_t.st_mtime > 0):
170             ret = "date"
171             self.logger.debug("%s: source is newer by %s s" %
172                               (tgt, stat_s.st_mtime - stat_t.st_mtime))
173         return ret
174    
175     def _make_pattern(self, path, isdir):
176         if isdir:
177             path = path + "/"
178         return path
179
180     def isdir(self, io, path):
181         return io.path.isdir(path) and not io.path.islink(path)
182
183     def copy(self, abs_s, abs_t):
184         try:
185             src = self.io_s.open(abs_s, "r" + self.mode)
186             tgt = self.io_t.open(abs_t, "w" + self.mode)
187             while True:
188                 buffer = src.read(self.blocksize)
189                 if not buffer: break
190                 tgt.write(buffer)
191         except(IOError, OSError):
192             self.logger.exception("error copying to %s" % abs_t)
193             try:
194                 self.io_s.unlink(abs_t)
195             except(IOError, OSError):
196                 self.logger.exception("error unlinking %s" % abs_t)
197                 pass
198
199         try:
200             src.close()
201             tgt.close()
202         except:
203             pass
204
205     def _unique(self, lst, eq):
206         """
207         Remove duplicate entries in list lst, using equality relation eq.
208         """
209         i = 0
210         while i < len(lst):
211             j = i + 1
212             while j < len(lst):
213                 if eq(lst[i], lst[j]):
214                     self.logger.warn("skipping %s (duplicate of %s)"
215                                      % (lst[j], lst[i]))
216                     del lst[j]
217                 else:
218                     j = j + 1
219             i = i + 1
220
221     def sync(self, path, _top=True):
222         """
223         Main work horse of Synchorinzer class.
224         Synchronize directory 'path' between source and target.
225
226         Called recursively. Call with _top = True initially.
227         """
228
229         # All action items are recorded in this "todo" list.
230         # Actions are only put into effect when the list is
231         # complete.
232         todo = self.SyncAction()
233         # 'reason' is a map that stores the reasons why we transfer
234         # files (regular files only). This is just informational.
235         reason = {}
236
237         path_s = self.io_s.path.join(self.io_s.root, path)
238         path_t = self.io_t.path.join(self.io_t.root, path)
239
240         if _top:
241             self.logger.info("sync starting: %s -> %s" % (path_s, path_t))
242         else:
243             self.logger.debug("sync: %s -> %s" % (path_s, path_t))
244        
245         lst_s = self.io_s.listdir(path_s)
246         try:
247             lst_t = self.io_t.listdir(path_t)
248         except OSError:
249             if self.dry_run:
250                 lst_t = []
251             else:
252                 raise
253
254         # in case io_s or io_t are case-insensitive, remove duplicate
255         # file names.
256         self._unique(lst_s, self.io_t.eq)
257         self._unique(lst_t, self.io_s.eq)
258
259         for x in lst_s:
260
261             abs_s = self.io_s.path.join(path_s, x)
262             isdir_s = self.isdir(self.io_s, abs_s)
263            
264             if not isdir_s and not self.io_s.path.isfile(abs_s):
265                 self.logger.info("skipping non-file %s" %  abs_s)
266                 continue
267
268             # This deletes x from lst_t. That enables us to simply
269             # iterate over lst_t later to find files to be deleted.
270             exists_t = self._pull(x, lst_t, self.io_t.eq)
271             abs_t = self.io_t.path.join(path_t, x)
272             if exists_t:
273                 isdir_t = self.isdir(self.io_t, abs_t)
274
275             if self.exclude(path, x, isdir_s):
276                 self.logger.debug("exclude src %s/%s" % (path, x))
277                 if self.delete and self.delete_excluded and exists_t:
278                     self.logger.log(NOTICE, "delete excluded %s/%s" % (path, x))
279                     if isdir_t:
280                         todo.rmd.append(x)
281                     else:
282                         todo.unl.append(x)
283                 continue   
284
285             # Here we know: src exists and is not excluded.
286             if isdir_s:
287                 todo.dsc.append(x)
288            
289             if exists_t:
290                 if isdir_s:
291                     if not isdir_t:
292                         todo.unl.append(x)
293                         todo.mkd.append(x)
294                 else:
295                     if isdir_t:
296                         todo.rmd.append(x)
297                         todo.cpy.append(x)
298                         reason[x] = "type"
299                     else:
300                         rsn = self.need_copy(abs_s, abs_t)
301                         if rsn:
302                             todo.unl.append(x)
303                             todo.cpy.append(x)
304                             reason[x] = "%s" % rsn
305             else# not exists_t
306                 if isdir_s:
307                     todo.mkd.append(x)
308                 else:
309                     reason[x] = "new"
310                     todo.cpy.append(x)
311         # for loop over lst_s ends
312
313         if self.delete:
314
315             # Anything now in lst_t didn't exist in src (see above)
316             for x in lst_t:
317                
318                 abs_t = self.io_t.path.join(path_t, x)
319                 isdir_t = self.isdir(self.io_t, abs_t)
320            
321                 if self.exclude(path, x, isdir_t):
322                     self.logger.debug("exclude tgt %s/%s" % (path, x))
323                     if not self.delete_excluded:
324                         continue
325                    
326                 self.logger.info("delete %s/%s" % (path, x))
327                 if isdir_t:
328                     todo.rmd.append(x)
329                 else:
330                     todo.unl.append(x)
331
332         # From here on ACTIONS ARE CARRIED OUT
333         # First all remove actions, than mkdir and copy
334         for x in todo.rmd:
335             try:
336                 self.logger.log(NOTICE, "rm -rf: %s/%s" % (path, x))
337                 self.rm_rf(self.io_t.path.join(path_t, x))
338             except (OSError, IOError):
339                 self.logger.exception("failed to rmdir %s" % x)
340
341         for x in todo.unl:
342             try:
343                 self.logger.log(NOTICE, "delete: %s/%s" % (path, x))
344                 if not self.dry_run:
345                     self.io_t.unlink(self.io_t.path.join(path_t, x))
346             except (OSError, IOError):
347                 self.logger.exception("failed to unlink %s" % x)
348
349         for x in todo.mkd:
350             try:
351                 self.logger.log(NOTICE, "mkdir: %s/%s" % (path, x))
352                 if not self.dry_run:
353                     self.io_t.mkdir(self.io_t.path.join(path_t, x))
354             except (OSError, IOError):
355                 self.logger.exception("failed to mkdir %s" % x)
356                 self._pull(x, todo.dsc)
357
358         for x in todo.cpy:
359             self.logger.log(NOTICE, "copy: %s/%s (reason: %s)"
360                  % (path, x, reason[x]))
361             if not self.dry_run:
362                 self.copy(self.io_s.path.join(path_s, x),
363                           self.io_s.path.join(path_t, x))
364
365         # Finally, recurse.
366         for x in todo.dsc:
367             self.sync(self.io_s.path.join(path, x), False)
368
369         if _top:
370             self.logger.info("sync finshed: %s -> %s" % (path_s, path_t))
371
372        
373 class RsyncSynchronizer(Synchronizer):
374     """
375     Special Synchronzer class that uses rsyncmatch.GlobChain
376     for include/exclude logic.
377     """
378     def __init__(self, *args, **kwargs):
379         Synchronizer.__init__(self, *args, **kwargs)
380         self.globchain = GlobChain()
381
382     def exclude(self, dir, name, isdir):
383         path = self.io_s.path.join(dir, name)
384         if isdir:
385             path = path + "/"
386        
387         gl = self.globchain.match(path)
388         return gl == EXCLUDE
Note: See TracBrowser for help on using the browser.