1 | # Copyright (C) 2003-2018, Stefan Schwarzer <sschwarzer@sschwarzer.net> |
---|
2 | # and ftputil contributors (see `doc/contributors.txt`) |
---|
3 | # See the file LICENSE for licensing terms. |
---|
4 | |
---|
5 | # Execute tests on a real FTP server (other tests use a mock server). |
---|
6 | # |
---|
7 | # This test writes some files and directories on the local client and |
---|
8 | # the remote server. You'll need write access in the login directory. |
---|
9 | # This test can take a few minutes because it has to wait to test the |
---|
10 | # timezone calculation. |
---|
11 | |
---|
12 | import ftplib |
---|
13 | import functools |
---|
14 | import gc |
---|
15 | import operator |
---|
16 | import os |
---|
17 | import time |
---|
18 | import stat |
---|
19 | |
---|
20 | import pytest |
---|
21 | |
---|
22 | import ftputil.error |
---|
23 | import ftputil.file_transfer |
---|
24 | import ftputil.session |
---|
25 | import ftputil.stat_cache |
---|
26 | |
---|
27 | import test |
---|
28 | |
---|
29 | |
---|
30 | def utc_local_time_shift(): |
---|
31 | """ |
---|
32 | Return the expected time shift in seconds assuming the server |
---|
33 | uses UTC in its listings and the client uses local time. |
---|
34 | |
---|
35 | This is needed because Pure-FTPd meanwhile seems to insist that |
---|
36 | the displayed time for files is in UTC. |
---|
37 | """ |
---|
38 | utc_tuple = time.gmtime() |
---|
39 | localtime_tuple = time.localtime() |
---|
40 | # To calculate the correct times shift, we need to ignore the |
---|
41 | # DST component in the localtime tuple, i. e. set it to 0. |
---|
42 | localtime_tuple = localtime_tuple[:-1] + (0,) |
---|
43 | time_shift_in_seconds = (time.mktime(utc_tuple) - |
---|
44 | time.mktime(localtime_tuple)) |
---|
45 | # To be safe, round the above value to units of 3600 s (1 hour). |
---|
46 | return round(time_shift_in_seconds / 3600.0) * 3600 |
---|
47 | |
---|
48 | # Difference between local times of server and client. If 0.0, server |
---|
49 | # and client use the same timezone. |
---|
50 | #EXPECTED_TIME_SHIFT = utc_local_time_shift() |
---|
51 | # Pure-FTPd seems to have changed its mind (see docstring of |
---|
52 | # `utc_local_time_shift`). |
---|
53 | EXPECTED_TIME_SHIFT = 0.0 |
---|
54 | |
---|
55 | |
---|
56 | class Cleaner: |
---|
57 | """ |
---|
58 | This class helps remove directories and files which might |
---|
59 | otherwise be left behind if a test fails in unexpected ways. |
---|
60 | """ |
---|
61 | |
---|
62 | def __init__(self, host): |
---|
63 | # The test class (probably `RealFTPTest`) and the helper |
---|
64 | # class share the same `FTPHost` object. |
---|
65 | self._host = host |
---|
66 | self._ftp_items = [] |
---|
67 | |
---|
68 | def add_dir(self, path): |
---|
69 | """Schedule a directory with path `path` for removal.""" |
---|
70 | self._ftp_items.append(("d", self._host.path.abspath(path))) |
---|
71 | |
---|
72 | def add_file(self, path): |
---|
73 | """Schedule a file with path `path` for removal.""" |
---|
74 | self._ftp_items.append(("f", self._host.path.abspath(path))) |
---|
75 | |
---|
76 | def clean(self): |
---|
77 | """ |
---|
78 | Remove the directories and files previously remembered. |
---|
79 | The removal works in reverse order of the scheduling with |
---|
80 | `add_dir` and `add_file`. |
---|
81 | |
---|
82 | Errors due to a removal are ignored. |
---|
83 | """ |
---|
84 | self._host.chdir("/") |
---|
85 | for type_, path in reversed(self._ftp_items): |
---|
86 | try: |
---|
87 | if type_ == "d": |
---|
88 | # If something goes wrong in `rmtree` we might |
---|
89 | # leave a mess behind. |
---|
90 | self._host.rmtree(path) |
---|
91 | elif type_ == "f": |
---|
92 | # Minor mess if `remove` fails |
---|
93 | self._host.remove(path) |
---|
94 | except ftputil.error.FTPError: |
---|
95 | pass |
---|
96 | |
---|
97 | |
---|
98 | class RealFTPTest: |
---|
99 | |
---|
100 | def setup_method(self, method): |
---|
101 | # Server, username, password. |
---|
102 | self.login_data = ("localhost", "ftptest", |
---|
103 | "d605581757de5eb56d568a4419f4126e") |
---|
104 | self.host = ftputil.FTPHost(*self.login_data) |
---|
105 | self.cleaner = Cleaner(self.host) |
---|
106 | |
---|
107 | def teardown_method(self, method): |
---|
108 | self.cleaner.clean() |
---|
109 | self.host.close() |
---|
110 | |
---|
111 | # |
---|
112 | # Helper methods |
---|
113 | # |
---|
114 | def make_remote_file(self, path): |
---|
115 | """Create a file on the FTP host.""" |
---|
116 | self.cleaner.add_file(path) |
---|
117 | with self.host.open(path, "wb") as file_: |
---|
118 | # Write something. Otherwise the FTP server might not update |
---|
119 | # the time of last modification if the file existed before. |
---|
120 | file_.write(b"\n") |
---|
121 | |
---|
122 | def make_local_file(self): |
---|
123 | """Create a file on the local host (= on the client side).""" |
---|
124 | with open("_local_file_", "wb") as fobj: |
---|
125 | fobj.write(b"abc\x12\x34def\t") |
---|
126 | |
---|
127 | |
---|
128 | class TestMkdir(RealFTPTest): |
---|
129 | |
---|
130 | def test_mkdir_rmdir(self): |
---|
131 | host = self.host |
---|
132 | dir_name = "_testdir_" |
---|
133 | file_name = host.path.join(dir_name, "_nonempty_") |
---|
134 | self.cleaner.add_dir(dir_name) |
---|
135 | # Make dir and check if the directory is there. |
---|
136 | host.mkdir(dir_name) |
---|
137 | files = host.listdir(host.curdir) |
---|
138 | assert dir_name in files |
---|
139 | # Try to remove a non-empty directory. |
---|
140 | self.cleaner.add_file(file_name) |
---|
141 | non_empty = host.open(file_name, "w") |
---|
142 | non_empty.close() |
---|
143 | with pytest.raises(ftputil.error.PermanentError): |
---|
144 | host.rmdir(dir_name) |
---|
145 | # Remove file. |
---|
146 | host.unlink(file_name) |
---|
147 | # `remove` on a directory should fail. |
---|
148 | try: |
---|
149 | try: |
---|
150 | host.remove(dir_name) |
---|
151 | except ftputil.error.PermanentError as exc: |
---|
152 | assert str(exc).startswith( |
---|
153 | "remove/unlink can only delete files") |
---|
154 | else: |
---|
155 | pytest.fail("we shouldn't have come here") |
---|
156 | finally: |
---|
157 | # Delete empty directory. |
---|
158 | host.rmdir(dir_name) |
---|
159 | files = host.listdir(host.curdir) |
---|
160 | assert dir_name not in files |
---|
161 | |
---|
162 | def test_makedirs_without_existing_dirs(self): |
---|
163 | host = self.host |
---|
164 | # No `_dir1_` yet |
---|
165 | assert "_dir1_" not in host.listdir(host.curdir) |
---|
166 | # Vanilla case, all should go well. |
---|
167 | host.makedirs("_dir1_/dir2/dir3/dir4") |
---|
168 | self.cleaner.add_dir("_dir1_") |
---|
169 | # Check host. |
---|
170 | assert host.path.isdir("_dir1_") |
---|
171 | assert host.path.isdir("_dir1_/dir2") |
---|
172 | assert host.path.isdir("_dir1_/dir2/dir3") |
---|
173 | assert host.path.isdir("_dir1_/dir2/dir3/dir4") |
---|
174 | |
---|
175 | def test_makedirs_from_non_root_directory(self): |
---|
176 | # This is a testcase for issue #22, see |
---|
177 | # http://ftputil.sschwarzer.net/trac/ticket/22 . |
---|
178 | host = self.host |
---|
179 | # No `_dir1_` and `_dir2_` yet |
---|
180 | assert "_dir1_" not in host.listdir(host.curdir) |
---|
181 | assert "_dir2_" not in host.listdir(host.curdir) |
---|
182 | # Part 1: Try to make directories starting from `_dir1_` and |
---|
183 | # change to non-root directory. |
---|
184 | self.cleaner.add_dir("_dir1_") |
---|
185 | host.mkdir("_dir1_") |
---|
186 | host.chdir("_dir1_") |
---|
187 | host.makedirs("_dir2_/_dir3_") |
---|
188 | # Test for expected directory hierarchy. |
---|
189 | assert host.path.isdir("/_dir1_") |
---|
190 | assert host.path.isdir("/_dir1_/_dir2_") |
---|
191 | assert host.path.isdir("/_dir1_/_dir2_/_dir3_") |
---|
192 | assert not host.path.isdir("/_dir1_/_dir1_") |
---|
193 | # Remove all but the directory we're in. |
---|
194 | host.rmdir("/_dir1_/_dir2_/_dir3_") |
---|
195 | host.rmdir("/_dir1_/_dir2_") |
---|
196 | # Part 2: Try to make directories starting from root. |
---|
197 | self.cleaner.add_dir("/_dir2_") |
---|
198 | host.makedirs("/_dir2_/_dir3_") |
---|
199 | # Test for expected directory hierarchy |
---|
200 | assert host.path.isdir("/_dir2_") |
---|
201 | assert host.path.isdir("/_dir2_/_dir3_") |
---|
202 | assert not host.path.isdir("/_dir1_/_dir2_") |
---|
203 | |
---|
204 | def test_makedirs_of_existing_directory(self): |
---|
205 | host = self.host |
---|
206 | # The (chrooted) login directory |
---|
207 | host.makedirs("/") |
---|
208 | |
---|
209 | def test_makedirs_with_file_in_the_way(self): |
---|
210 | host = self.host |
---|
211 | self.cleaner.add_dir("_dir1_") |
---|
212 | host.mkdir("_dir1_") |
---|
213 | self.make_remote_file("_dir1_/file1") |
---|
214 | # Try it. |
---|
215 | with pytest.raises(ftputil.error.PermanentError): |
---|
216 | host.makedirs("_dir1_/file1") |
---|
217 | with pytest.raises(ftputil.error.PermanentError): |
---|
218 | host.makedirs("_dir1_/file1/dir2") |
---|
219 | |
---|
220 | def test_makedirs_with_existing_directory(self): |
---|
221 | host = self.host |
---|
222 | self.cleaner.add_dir("_dir1_") |
---|
223 | host.mkdir("_dir1_") |
---|
224 | host.makedirs("_dir1_/dir2") |
---|
225 | # Check |
---|
226 | assert host.path.isdir("_dir1_") |
---|
227 | assert host.path.isdir("_dir1_/dir2") |
---|
228 | |
---|
229 | def test_makedirs_in_non_writable_directory(self): |
---|
230 | host = self.host |
---|
231 | # Preparation: `rootdir1` exists but is only writable by root. |
---|
232 | with pytest.raises(ftputil.error.PermanentError): |
---|
233 | host.makedirs("rootdir1/dir2") |
---|
234 | |
---|
235 | def test_makedirs_with_writable_directory_at_end(self): |
---|
236 | host = self.host |
---|
237 | self.cleaner.add_dir("rootdir2/dir2") |
---|
238 | # Preparation: `rootdir2` exists but is only writable by root. |
---|
239 | # `dir2` is writable by regular ftp users. Both directories |
---|
240 | # below should work. |
---|
241 | host.makedirs("rootdir2/dir2") |
---|
242 | host.makedirs("rootdir2/dir2/dir3") |
---|
243 | |
---|
244 | |
---|
245 | class TestRemoval(RealFTPTest): |
---|
246 | |
---|
247 | def test_rmtree_without_error_handler(self): |
---|
248 | host = self.host |
---|
249 | # Build a tree. |
---|
250 | self.cleaner.add_dir("_dir1_") |
---|
251 | host.makedirs("_dir1_/dir2") |
---|
252 | self.make_remote_file("_dir1_/file1") |
---|
253 | self.make_remote_file("_dir1_/file2") |
---|
254 | self.make_remote_file("_dir1_/dir2/file3") |
---|
255 | self.make_remote_file("_dir1_/dir2/file4") |
---|
256 | # Try to remove a _file_ with `rmtree`. |
---|
257 | with pytest.raises(ftputil.error.PermanentError): |
---|
258 | host.rmtree("_dir1_/file2") |
---|
259 | # Remove `dir2`. |
---|
260 | host.rmtree("_dir1_/dir2") |
---|
261 | assert not host.path.exists("_dir1_/dir2") |
---|
262 | assert host.path.exists("_dir1_/file2") |
---|
263 | # Re-create `dir2` and remove `_dir1_`. |
---|
264 | host.mkdir("_dir1_/dir2") |
---|
265 | self.make_remote_file("_dir1_/dir2/file3") |
---|
266 | self.make_remote_file("_dir1_/dir2/file4") |
---|
267 | host.rmtree("_dir1_") |
---|
268 | assert not host.path.exists("_dir1_") |
---|
269 | |
---|
270 | def test_rmtree_with_error_handler(self): |
---|
271 | host = self.host |
---|
272 | self.cleaner.add_dir("_dir1_") |
---|
273 | host.mkdir("_dir1_") |
---|
274 | self.make_remote_file("_dir1_/file1") |
---|
275 | # Prepare error "handler" |
---|
276 | log = [] |
---|
277 | def error_handler(*args): |
---|
278 | log.append(args) |
---|
279 | # Try to remove a file as root "directory". |
---|
280 | host.rmtree("_dir1_/file1", ignore_errors=True, onerror=error_handler) |
---|
281 | assert log == [] |
---|
282 | host.rmtree("_dir1_/file1", ignore_errors=False, onerror=error_handler) |
---|
283 | assert log[0][0] == host.listdir |
---|
284 | assert log[0][1] == "_dir1_/file1" |
---|
285 | assert log[1][0] == host.rmdir |
---|
286 | assert log[1][1] == "_dir1_/file1" |
---|
287 | host.rmtree("_dir1_") |
---|
288 | # Try to remove a non-existent directory. |
---|
289 | del log[:] |
---|
290 | host.rmtree("_dir1_", ignore_errors=False, onerror=error_handler) |
---|
291 | assert log[0][0] == host.listdir |
---|
292 | assert log[0][1] == "_dir1_" |
---|
293 | assert log[1][0] == host.rmdir |
---|
294 | assert log[1][1] == "_dir1_" |
---|
295 | |
---|
296 | def test_remove_non_existent_item(self): |
---|
297 | host = self.host |
---|
298 | with pytest.raises(ftputil.error.PermanentError): |
---|
299 | host.remove("nonexistent") |
---|
300 | |
---|
301 | def test_remove_existing_file(self): |
---|
302 | self.cleaner.add_file("_testfile_") |
---|
303 | self.make_remote_file("_testfile_") |
---|
304 | host = self.host |
---|
305 | assert host.path.isfile("_testfile_") |
---|
306 | host.remove("_testfile_") |
---|
307 | assert not host.path.exists("_testfile_") |
---|
308 | |
---|
309 | |
---|
310 | class TestWalk(RealFTPTest): |
---|
311 | """ |
---|
312 | Walk the directory tree |
---|
313 | |
---|
314 | walk_test |
---|
315 | ├── dir1 |
---|
316 | │ ├── dir11 |
---|
317 | │ └── dir12 |
---|
318 | │ ├── dir123 |
---|
319 | │ │ └── file1234 |
---|
320 | │ ├── file121 |
---|
321 | │ └── file122 |
---|
322 | ├── dir2 |
---|
323 | ├── dir3 |
---|
324 | │ ├── dir31 |
---|
325 | │ ├── dir32 -> ../dir1/dir12/dir123 |
---|
326 | │ ├── file31 |
---|
327 | │ └── file32 |
---|
328 | └── file4 |
---|
329 | |
---|
330 | and check if the results are the expected ones. |
---|
331 | """ |
---|
332 | |
---|
333 | def _walk_test(self, expected_result, **walk_kwargs): |
---|
334 | """Walk the directory and test results.""" |
---|
335 | # Collect data using `walk`. |
---|
336 | actual_result = [] |
---|
337 | for items in self.host.walk(**walk_kwargs): |
---|
338 | actual_result.append(items) |
---|
339 | # Compare with expected results. |
---|
340 | assert len(actual_result) == len(expected_result) |
---|
341 | for index, _ in enumerate(actual_result): |
---|
342 | assert actual_result[index] == expected_result[index] |
---|
343 | |
---|
344 | def test_walk_topdown(self): |
---|
345 | # Preparation: build tree in directory `walk_test`. |
---|
346 | expected_result = [ |
---|
347 | ("walk_test", |
---|
348 | ["dir1", "dir2", "dir3"], |
---|
349 | ["file4"]), |
---|
350 | # |
---|
351 | ("walk_test/dir1", |
---|
352 | ["dir11", "dir12"], |
---|
353 | []), |
---|
354 | # |
---|
355 | ("walk_test/dir1/dir11", |
---|
356 | [], |
---|
357 | []), |
---|
358 | # |
---|
359 | ("walk_test/dir1/dir12", |
---|
360 | ["dir123"], |
---|
361 | ["file121", "file122"]), |
---|
362 | # |
---|
363 | ("walk_test/dir1/dir12/dir123", |
---|
364 | [], |
---|
365 | ["file1234"]), |
---|
366 | # |
---|
367 | ("walk_test/dir2", |
---|
368 | [], |
---|
369 | []), |
---|
370 | # |
---|
371 | ("walk_test/dir3", |
---|
372 | ["dir31", "dir32"], |
---|
373 | ["file31", "file32"]), |
---|
374 | # |
---|
375 | ("walk_test/dir3/dir31", |
---|
376 | [], |
---|
377 | []), |
---|
378 | ] |
---|
379 | self._walk_test(expected_result, top="walk_test") |
---|
380 | |
---|
381 | def test_walk_depth_first(self): |
---|
382 | # Preparation: build tree in directory `walk_test` |
---|
383 | expected_result = [ |
---|
384 | ("walk_test/dir1/dir11", |
---|
385 | [], |
---|
386 | []), |
---|
387 | # |
---|
388 | ("walk_test/dir1/dir12/dir123", |
---|
389 | [], |
---|
390 | ["file1234"]), |
---|
391 | # |
---|
392 | ("walk_test/dir1/dir12", |
---|
393 | ["dir123"], |
---|
394 | ["file121", "file122"]), |
---|
395 | # |
---|
396 | ("walk_test/dir1", |
---|
397 | ["dir11", "dir12"], |
---|
398 | []), |
---|
399 | # |
---|
400 | ("walk_test/dir2", |
---|
401 | [], |
---|
402 | []), |
---|
403 | # |
---|
404 | ("walk_test/dir3/dir31", |
---|
405 | [], |
---|
406 | []), |
---|
407 | # |
---|
408 | ("walk_test/dir3", |
---|
409 | ["dir31", "dir32"], |
---|
410 | ["file31", "file32"]), |
---|
411 | # |
---|
412 | ("walk_test", |
---|
413 | ["dir1", "dir2", "dir3"], |
---|
414 | ["file4"]) |
---|
415 | ] |
---|
416 | self._walk_test(expected_result, top="walk_test", topdown=False) |
---|
417 | |
---|
418 | def test_walk_following_links(self): |
---|
419 | # Preparation: build tree in directory `walk_test`. |
---|
420 | expected_result = [ |
---|
421 | ("walk_test", |
---|
422 | ["dir1", "dir2", "dir3"], |
---|
423 | ["file4"]), |
---|
424 | # |
---|
425 | ("walk_test/dir1", |
---|
426 | ["dir11", "dir12"], |
---|
427 | []), |
---|
428 | # |
---|
429 | ("walk_test/dir1/dir11", |
---|
430 | [], |
---|
431 | []), |
---|
432 | # |
---|
433 | ("walk_test/dir1/dir12", |
---|
434 | ["dir123"], |
---|
435 | ["file121", "file122"]), |
---|
436 | # |
---|
437 | ("walk_test/dir1/dir12/dir123", |
---|
438 | [], |
---|
439 | ["file1234"]), |
---|
440 | # |
---|
441 | ("walk_test/dir2", |
---|
442 | [], |
---|
443 | []), |
---|
444 | # |
---|
445 | ("walk_test/dir3", |
---|
446 | ["dir31", "dir32"], |
---|
447 | ["file31", "file32"]), |
---|
448 | # |
---|
449 | ("walk_test/dir3/dir31", |
---|
450 | [], |
---|
451 | []), |
---|
452 | # |
---|
453 | ("walk_test/dir3/dir32", |
---|
454 | [], |
---|
455 | ["file1234"]), |
---|
456 | ] |
---|
457 | self._walk_test(expected_result, top="walk_test", followlinks=True) |
---|
458 | |
---|
459 | |
---|
460 | class TestRename(RealFTPTest): |
---|
461 | |
---|
462 | def test_rename(self): |
---|
463 | host = self.host |
---|
464 | # Make sure the target of the renaming operation is removed. |
---|
465 | self.cleaner.add_file("_testfile2_") |
---|
466 | self.make_remote_file("_testfile1_") |
---|
467 | host.rename("_testfile1_", "_testfile2_") |
---|
468 | assert not host.path.exists("_testfile1_") |
---|
469 | assert host.path.exists("_testfile2_") |
---|
470 | |
---|
471 | def test_rename_with_spaces_in_directory(self): |
---|
472 | host = self.host |
---|
473 | dir_name = "_dir with spaces_" |
---|
474 | self.cleaner.add_dir(dir_name) |
---|
475 | host.mkdir(dir_name) |
---|
476 | self.make_remote_file(dir_name + "/testfile1") |
---|
477 | host.rename(dir_name + "/testfile1", dir_name + "/testfile2") |
---|
478 | assert not host.path.exists(dir_name + "/testfile1") |
---|
479 | assert host.path.exists(dir_name + "/testfile2") |
---|
480 | |
---|
481 | |
---|
482 | class TestStat(RealFTPTest): |
---|
483 | |
---|
484 | def test_stat(self): |
---|
485 | host = self.host |
---|
486 | dir_name = "_testdir_" |
---|
487 | file_name = host.path.join(dir_name, "_nonempty_") |
---|
488 | # Make a directory and a file in it. |
---|
489 | self.cleaner.add_dir(dir_name) |
---|
490 | host.mkdir(dir_name) |
---|
491 | with host.open(file_name, "wb") as fobj: |
---|
492 | fobj.write(b"abc\x12\x34def\t") |
---|
493 | # Do some stats |
---|
494 | # - dir |
---|
495 | dir_stat = host.stat(dir_name) |
---|
496 | assert isinstance(dir_stat._st_name, str) |
---|
497 | assert host.listdir(dir_name) == ["_nonempty_"] |
---|
498 | assert host.path.isdir(dir_name) |
---|
499 | assert not host.path.isfile(dir_name) |
---|
500 | assert not host.path.islink(dir_name) |
---|
501 | # - file |
---|
502 | file_stat = host.stat(file_name) |
---|
503 | assert isinstance(file_stat._st_name, str) |
---|
504 | assert not host.path.isdir(file_name) |
---|
505 | assert host.path.isfile(file_name) |
---|
506 | assert not host.path.islink(file_name) |
---|
507 | assert host.path.getsize(file_name) == 9 |
---|
508 | # - file's modification time; allow up to two minutes difference |
---|
509 | host.synchronize_times() |
---|
510 | server_mtime = host.path.getmtime(file_name) |
---|
511 | client_mtime = time.mktime(time.localtime()) |
---|
512 | calculated_time_shift = server_mtime - client_mtime |
---|
513 | assert not abs(calculated_time_shift-host.time_shift()) > 120 |
---|
514 | |
---|
515 | def test_issomething_for_nonexistent_directory(self): |
---|
516 | host = self.host |
---|
517 | # Check if we get the right results if even the containing |
---|
518 | # directory doesn't exist (see ticket #66). |
---|
519 | nonexistent_path = "/nonexistent/nonexistent" |
---|
520 | assert not host.path.isdir(nonexistent_path) |
---|
521 | assert not host.path.isfile(nonexistent_path) |
---|
522 | assert not host.path.islink(nonexistent_path) |
---|
523 | |
---|
524 | def test_special_broken_link(self): |
---|
525 | # Test for ticket #39. |
---|
526 | host = self.host |
---|
527 | broken_link_name = os.path.join("dir_with_broken_link", "nonexistent") |
---|
528 | assert (host.lstat(broken_link_name)._st_target == |
---|
529 | "../nonexistent/nonexistent") |
---|
530 | assert not host.path.isdir(broken_link_name) |
---|
531 | assert not host.path.isfile(broken_link_name) |
---|
532 | assert host.path.islink(broken_link_name) |
---|
533 | |
---|
534 | def test_concurrent_access(self): |
---|
535 | self.make_remote_file("_testfile_") |
---|
536 | with ftputil.FTPHost(*self.login_data) as host1: |
---|
537 | with ftputil.FTPHost(*self.login_data) as host2: |
---|
538 | stat_result1 = host1.stat("_testfile_") |
---|
539 | stat_result2 = host2.stat("_testfile_") |
---|
540 | assert stat_result1 == stat_result2 |
---|
541 | host2.remove("_testfile_") |
---|
542 | # Can still get the result via `host1` |
---|
543 | stat_result1 = host1.stat("_testfile_") |
---|
544 | assert stat_result1 == stat_result2 |
---|
545 | # Stat'ing on `host2` gives an exception. |
---|
546 | with pytest.raises(ftputil.error.PermanentError): |
---|
547 | host2.stat("_testfile_") |
---|
548 | # Stat'ing on `host1` after invalidation |
---|
549 | absolute_path = host1.path.join(host1.getcwd(), "_testfile_") |
---|
550 | host1.stat_cache.invalidate(absolute_path) |
---|
551 | with pytest.raises(ftputil.error.PermanentError): |
---|
552 | host1.stat("_testfile_") |
---|
553 | |
---|
554 | def test_cache_auto_resizing(self): |
---|
555 | """Test if the cache is resized appropriately.""" |
---|
556 | host = self.host |
---|
557 | cache = host.stat_cache._cache |
---|
558 | # Make sure the cache size isn't adjusted towards smaller values. |
---|
559 | unused_entries = host.listdir("walk_test") |
---|
560 | assert cache.size == ftputil.stat_cache.StatCache._DEFAULT_CACHE_SIZE |
---|
561 | # Make the cache very small initially and see if it gets resized. |
---|
562 | cache.size = 2 |
---|
563 | entries = host.listdir("walk_test") |
---|
564 | # The adjusted cache size should be larger or equal to the |
---|
565 | # number of items in `walk_test` and its parent directory. The |
---|
566 | # latter is read implicitly upon `listdir`'s `isdir` call. |
---|
567 | expected_min_cache_size = max(len(host.listdir(host.curdir)), |
---|
568 | len(entries)) |
---|
569 | assert cache.size >= expected_min_cache_size |
---|
570 | |
---|
571 | |
---|
572 | class TestUploadAndDownload(RealFTPTest): |
---|
573 | """Test upload and download (including time shift test).""" |
---|
574 | |
---|
575 | def test_time_shift(self): |
---|
576 | self.host.synchronize_times() |
---|
577 | assert self.host.time_shift() == EXPECTED_TIME_SHIFT |
---|
578 | |
---|
579 | @pytest.mark.slow_test |
---|
580 | def test_upload(self): |
---|
581 | host = self.host |
---|
582 | host.synchronize_times() |
---|
583 | local_file = "_local_file_" |
---|
584 | remote_file = "_remote_file_" |
---|
585 | # Make local file to upload. |
---|
586 | self.make_local_file() |
---|
587 | # Wait, else small time differences between client and server |
---|
588 | # actually could trigger the update. |
---|
589 | time.sleep(65) |
---|
590 | try: |
---|
591 | self.cleaner.add_file(remote_file) |
---|
592 | host.upload(local_file, remote_file) |
---|
593 | # Retry; shouldn't be uploaded |
---|
594 | uploaded = host.upload_if_newer(local_file, remote_file) |
---|
595 | assert uploaded is False |
---|
596 | # Rewrite the local file. |
---|
597 | self.make_local_file() |
---|
598 | # Retry; should be uploaded now |
---|
599 | uploaded = host.upload_if_newer(local_file, remote_file) |
---|
600 | assert uploaded is True |
---|
601 | finally: |
---|
602 | # Clean up |
---|
603 | os.unlink(local_file) |
---|
604 | |
---|
605 | @pytest.mark.slow_test |
---|
606 | def test_download(self): |
---|
607 | host = self.host |
---|
608 | host.synchronize_times() |
---|
609 | local_file = "_local_file_" |
---|
610 | remote_file = "_remote_file_" |
---|
611 | # Make a remote file. |
---|
612 | self.make_remote_file(remote_file) |
---|
613 | # File should be downloaded as it's not present yet. |
---|
614 | downloaded = host.download_if_newer(remote_file, local_file) |
---|
615 | assert downloaded is True |
---|
616 | try: |
---|
617 | # If the remote file, taking the datetime precision into |
---|
618 | # account, _might_ be newer, the file will be downloaded |
---|
619 | # again. To prevent this, wait a bit over a minute (the |
---|
620 | # remote precision), then "touch" the local file. |
---|
621 | time.sleep(65) |
---|
622 | # Create empty file. |
---|
623 | with open(local_file, "w") as fobj: |
---|
624 | pass |
---|
625 | # Local file is present and newer, so shouldn't download. |
---|
626 | downloaded = host.download_if_newer(remote_file, local_file) |
---|
627 | assert downloaded is False |
---|
628 | # Re-make the remote file. |
---|
629 | self.make_remote_file(remote_file) |
---|
630 | # Local file is present but possibly older (taking the |
---|
631 | # possible deviation because of the precision into account), |
---|
632 | # so should download. |
---|
633 | downloaded = host.download_if_newer(remote_file, local_file) |
---|
634 | assert downloaded is True |
---|
635 | finally: |
---|
636 | # Clean up. |
---|
637 | os.unlink(local_file) |
---|
638 | |
---|
639 | def test_callback_with_transfer(self): |
---|
640 | host = self.host |
---|
641 | FILE_NAME = "debian-keyring.tar.gz" |
---|
642 | # Default chunk size as in `FTPHost.copyfileobj` |
---|
643 | MAX_COPY_CHUNK_SIZE = ftputil.file_transfer.MAX_COPY_CHUNK_SIZE |
---|
644 | file_size = host.path.getsize(FILE_NAME) |
---|
645 | chunk_count, _ = divmod(file_size, MAX_COPY_CHUNK_SIZE) |
---|
646 | # Add one chunk for remainder. |
---|
647 | chunk_count += 1 |
---|
648 | # Define a callback that just collects all data passed to it. |
---|
649 | transferred_chunks_list = [] |
---|
650 | def test_callback(chunk): |
---|
651 | transferred_chunks_list.append(chunk) |
---|
652 | try: |
---|
653 | host.download(FILE_NAME, FILE_NAME, callback=test_callback) |
---|
654 | # Construct a list of data chunks we expect. |
---|
655 | expected_chunks_list = [] |
---|
656 | with open(FILE_NAME, "rb") as downloaded_fobj: |
---|
657 | while True: |
---|
658 | chunk = downloaded_fobj.read(MAX_COPY_CHUNK_SIZE) |
---|
659 | if not chunk: |
---|
660 | break |
---|
661 | expected_chunks_list.append(chunk) |
---|
662 | # Examine data collected by callback function. |
---|
663 | assert len(transferred_chunks_list) == chunk_count |
---|
664 | assert transferred_chunks_list == expected_chunks_list |
---|
665 | finally: |
---|
666 | os.unlink(FILE_NAME) |
---|
667 | |
---|
668 | |
---|
669 | class TestFTPFiles(RealFTPTest): |
---|
670 | |
---|
671 | def test_only_closed_children(self): |
---|
672 | REMOTE_FILE_NAME = "CONTENTS" |
---|
673 | host = self.host |
---|
674 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj1: |
---|
675 | # Create empty file and close it. |
---|
676 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj2: |
---|
677 | pass |
---|
678 | # This should re-use the second child because the first isn't |
---|
679 | # closed but the second is. |
---|
680 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj: |
---|
681 | assert len(host._children) == 2 |
---|
682 | assert file_obj._host is host._children[1] |
---|
683 | |
---|
684 | def test_no_timed_out_children(self): |
---|
685 | REMOTE_FILE_NAME = "CONTENTS" |
---|
686 | host = self.host |
---|
687 | # Implicitly create child host object. |
---|
688 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj1: |
---|
689 | pass |
---|
690 | # Monkey-patch file to simulate an FTP server timeout below. |
---|
691 | def timed_out_pwd(): |
---|
692 | raise ftplib.error_temp("simulated timeout") |
---|
693 | file_obj1._host._session.pwd = timed_out_pwd |
---|
694 | # Try to get a file - which shouldn't be the timed-out file. |
---|
695 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj2: |
---|
696 | assert file_obj1 is not file_obj2 |
---|
697 | # Re-use closed and not timed-out child session. |
---|
698 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj3: |
---|
699 | pass |
---|
700 | assert file_obj2 is file_obj3 |
---|
701 | |
---|
702 | def test_no_delayed_226_children(self): |
---|
703 | REMOTE_FILE_NAME = "CONTENTS" |
---|
704 | host = self.host |
---|
705 | # Implicitly create child host object. |
---|
706 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj1: |
---|
707 | pass |
---|
708 | # Monkey-patch file to simulate an FTP server timeout below. |
---|
709 | def timed_out_pwd(): |
---|
710 | raise ftplib.error_reply("delayed 226 reply") |
---|
711 | file_obj1._host._session.pwd = timed_out_pwd |
---|
712 | # Try to get a file - which shouldn't be the timed-out file. |
---|
713 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj2: |
---|
714 | assert file_obj1 is not file_obj2 |
---|
715 | # Re-use closed and not timed-out child session. |
---|
716 | with host.open(REMOTE_FILE_NAME, "rb") as file_obj3: |
---|
717 | pass |
---|
718 | assert file_obj2 is file_obj3 |
---|
719 | |
---|
720 | |
---|
721 | class TestChmod(RealFTPTest): |
---|
722 | |
---|
723 | def assert_mode(self, path, expected_mode): |
---|
724 | """ |
---|
725 | Return an integer containing the allowed bits in the mode |
---|
726 | change command. |
---|
727 | |
---|
728 | The `FTPHost` object to test against is `self.host`. |
---|
729 | """ |
---|
730 | full_mode = self.host.stat(path).st_mode |
---|
731 | # Remove flags we can't set via `chmod`. |
---|
732 | # Allowed flags according to Python documentation |
---|
733 | # https://docs.python.org/library/stat.html |
---|
734 | allowed_flags = [stat.S_ISUID, stat.S_ISGID, stat.S_ENFMT, |
---|
735 | stat.S_ISVTX, stat.S_IREAD, stat.S_IWRITE, stat.S_IEXEC, |
---|
736 | stat.S_IRWXU, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR, |
---|
737 | stat.S_IRWXG, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP, |
---|
738 | stat.S_IRWXO, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH] |
---|
739 | allowed_mask = functools.reduce(operator.or_, allowed_flags) |
---|
740 | mode = full_mode & allowed_mask |
---|
741 | assert mode == expected_mode, ( |
---|
742 | "mode {0:o} != {1:o}".format(mode, expected_mode)) |
---|
743 | |
---|
744 | def test_chmod_existing_directory(self): |
---|
745 | host = self.host |
---|
746 | host.mkdir("_test dir_") |
---|
747 | self.cleaner.add_dir("_test dir_") |
---|
748 | # Set/get mode of a directory. |
---|
749 | host.chmod("_test dir_", 0o757) |
---|
750 | self.assert_mode("_test dir_", 0o757) |
---|
751 | # Set/get mode in nested directory. |
---|
752 | host.mkdir("_test dir_/nested_dir") |
---|
753 | self.cleaner.add_dir("_test dir_/nested_dir") |
---|
754 | host.chmod("_test dir_/nested_dir", 0o757) |
---|
755 | self.assert_mode("_test dir_/nested_dir", 0o757) |
---|
756 | |
---|
757 | def test_chmod_existing_file(self): |
---|
758 | host = self.host |
---|
759 | host.mkdir("_test dir_") |
---|
760 | self.cleaner.add_dir("_test dir_") |
---|
761 | # Set/get mode on a file. |
---|
762 | file_name = host.path.join("_test dir_", "_testfile_") |
---|
763 | self.make_remote_file(file_name) |
---|
764 | host.chmod(file_name, 0o646) |
---|
765 | self.assert_mode(file_name, 0o646) |
---|
766 | |
---|
767 | def test_chmod_nonexistent_path(self): |
---|
768 | # Set/get mode of a non-existing item. |
---|
769 | with pytest.raises(ftputil.error.PermanentError): |
---|
770 | self.host.chmod("nonexistent", 0o757) |
---|
771 | |
---|
772 | def test_cache_invalidation(self): |
---|
773 | host = self.host |
---|
774 | host.mkdir("_test dir_") |
---|
775 | self.cleaner.add_dir("_test dir_") |
---|
776 | # Make sure the mode is in the cache. |
---|
777 | unused_stat_result = host.stat("_test dir_") |
---|
778 | # Set/get mode of the directory. |
---|
779 | host.chmod("_test dir_", 0o757) |
---|
780 | self.assert_mode("_test dir_", 0o757) |
---|
781 | # Set/get mode on a file. |
---|
782 | file_name = host.path.join("_test dir_", "_testfile_") |
---|
783 | self.make_remote_file(file_name) |
---|
784 | # Make sure the mode is in the cache. |
---|
785 | unused_stat_result = host.stat(file_name) |
---|
786 | host.chmod(file_name, 0o646) |
---|
787 | self.assert_mode(file_name, 0o646) |
---|
788 | |
---|
789 | |
---|
790 | class TestRestArgument(RealFTPTest): |
---|
791 | |
---|
792 | TEST_FILE_NAME = "rest_test" |
---|
793 | |
---|
794 | def setup_method(self, method): |
---|
795 | super(TestRestArgument, self).setup_method(method) |
---|
796 | # Write test file. |
---|
797 | with self.host.open(self.TEST_FILE_NAME, "wb") as fobj: |
---|
798 | fobj.write(b"abcdefghijkl") |
---|
799 | self.cleaner.add_file(self.TEST_FILE_NAME) |
---|
800 | |
---|
801 | def test_for_reading(self): |
---|
802 | """ |
---|
803 | If a `rest` argument is passed to `open`, the following read |
---|
804 | operation should start at the byte given by `rest`. |
---|
805 | """ |
---|
806 | with self.host.open(self.TEST_FILE_NAME, "rb", rest=3) as fobj: |
---|
807 | data = fobj.read() |
---|
808 | assert data == b"defghijkl" |
---|
809 | |
---|
810 | def test_for_writing(self): |
---|
811 | """ |
---|
812 | If a `rest` argument is passed to `open`, the following write |
---|
813 | operation should start writing at the byte given by `rest`. |
---|
814 | """ |
---|
815 | with self.host.open(self.TEST_FILE_NAME, "wb", rest=3) as fobj: |
---|
816 | fobj.write(b"123") |
---|
817 | with self.host.open(self.TEST_FILE_NAME, "rb") as fobj: |
---|
818 | data = fobj.read() |
---|
819 | assert data == b"abc123" |
---|
820 | |
---|
821 | def test_invalid_read_from_text_file(self): |
---|
822 | """ |
---|
823 | If the `rest` argument is used for reading from a text file, |
---|
824 | a `CommandNotImplementedError` should be raised. |
---|
825 | """ |
---|
826 | with pytest.raises(ftputil.error.CommandNotImplementedError): |
---|
827 | self.host.open(self.TEST_FILE_NAME, "r", rest=3) |
---|
828 | |
---|
829 | def test_invalid_write_to_text_file(self): |
---|
830 | """ |
---|
831 | If the `rest` argument is used for reading from a text file, |
---|
832 | a `CommandNotImplementedError` should be raised. |
---|
833 | """ |
---|
834 | with pytest.raises(ftputil.error.CommandNotImplementedError): |
---|
835 | self.host.open(self.TEST_FILE_NAME, "w", rest=3) |
---|
836 | |
---|
837 | # There are no tests for reading and writing beyond the end of a |
---|
838 | # file. For example, if the remote file is 10 bytes long and |
---|
839 | # `open(remote_file, "rb", rest=100)` is used, the server may |
---|
840 | # return an error status code or not. |
---|
841 | # |
---|
842 | # The server I use for testing returns a 554 status when |
---|
843 | # attempting to _read_ beyond the end of the file. On the other |
---|
844 | # hand, if attempting to _write_ beyond the end of the file, the |
---|
845 | # server accepts the request, but starts writing after the end of |
---|
846 | # the file, i. e. appends to the file. |
---|
847 | # |
---|
848 | # Instead of expecting certain responses that may differ between |
---|
849 | # server implementations, I leave the bahavior for too large |
---|
850 | # `rest` arguments undefined. In practice, this shouldn't be a |
---|
851 | # problem because the `rest` argument should only be used for |
---|
852 | # error recovery, and in this case a valid byte count for the |
---|
853 | # `rest` argument should be known. |
---|
854 | |
---|
855 | |
---|
856 | class TestOther(RealFTPTest): |
---|
857 | |
---|
858 | def test_open_for_reading(self): |
---|
859 | # Test for issues #17 and #51, |
---|
860 | # http://ftputil.sschwarzer.net/trac/ticket/17 and |
---|
861 | # http://ftputil.sschwarzer.net/trac/ticket/51 . |
---|
862 | file1 = self.host.open("debian-keyring.tar.gz", "rb") |
---|
863 | time.sleep(1) |
---|
864 | # Depending on the FTP server, this might return a status code |
---|
865 | # unexpected by `ftplib` or block the socket connection until |
---|
866 | # a server-side timeout. |
---|
867 | file1.close() |
---|
868 | |
---|
869 | def test_subsequent_reading(self): |
---|
870 | # Open a file for reading. |
---|
871 | with self.host.open("CONTENTS", "rb") as file1: |
---|
872 | pass |
---|
873 | # Make sure that there are no problems if the connection is reused. |
---|
874 | with self.host.open("CONTENTS", "rb") as file2: |
---|
875 | pass |
---|
876 | assert file1._session is file2._session |
---|
877 | |
---|
878 | def test_names_with_spaces(self): |
---|
879 | # Test if directories and files with spaces in their names |
---|
880 | # can be used. |
---|
881 | host = self.host |
---|
882 | assert host.path.isdir("dir with spaces") |
---|
883 | assert (host.listdir("dir with spaces") == |
---|
884 | ["second dir", "some file", "some_file"]) |
---|
885 | assert host.path.isdir("dir with spaces/second dir") |
---|
886 | assert host.path.isfile("dir with spaces/some_file") |
---|
887 | assert host.path.isfile("dir with spaces/some file") |
---|
888 | |
---|
889 | def test_synchronize_times_without_write_access(self): |
---|
890 | """Test failing synchronization because of non-writable directory.""" |
---|
891 | host = self.host |
---|
892 | # This isn't writable by the ftp account the tests are run under. |
---|
893 | host.chdir("rootdir1") |
---|
894 | with pytest.raises(ftputil.error.TimeShiftError): |
---|
895 | host.synchronize_times() |
---|
896 | |
---|
897 | def test_listdir_with_non_ascii_byte_string(self): |
---|
898 | """ |
---|
899 | `listdir` should accept byte strings with non-ASCII |
---|
900 | characters and return non-ASCII characters in directory or |
---|
901 | file names. |
---|
902 | """ |
---|
903 | host = self.host |
---|
904 | path = "äbc".encode("UTF-8") |
---|
905 | names = host.listdir(path) |
---|
906 | assert names[0] == b"file1" |
---|
907 | assert names[1] == "file1_ö".encode("UTF-8") |
---|
908 | |
---|
909 | def test_listdir_with_non_ascii_unicode_string(self): |
---|
910 | """ |
---|
911 | `listdir` should accept unicode strings with non-ASCII |
---|
912 | characters and return non-ASCII characters in directory or |
---|
913 | file names. |
---|
914 | """ |
---|
915 | host = self.host |
---|
916 | # `ftplib` under Python 3 only works correctly if the unicode |
---|
917 | # strings are decoded from latin1. |
---|
918 | path = "äbc".encode("UTF-8").decode("latin1") |
---|
919 | names = host.listdir(path) |
---|
920 | assert names[0] == "file1" |
---|
921 | assert names[1] == "file1_ö".encode("UTF-8").decode("latin1") |
---|
922 | |
---|
923 | def test_path_with_non_latin1_unicode_string(self): |
---|
924 | """ |
---|
925 | ftputil operations shouldn't accept file paths with non-latin1 |
---|
926 | characters. |
---|
927 | """ |
---|
928 | # Use some musical symbols. These are certainly not latin1. |
---|
929 | path = "𝄞𝄢" |
---|
930 | # `UnicodeEncodeError` is also the exception that `ftplib` |
---|
931 | # raises if it gets a non-latin1 path. |
---|
932 | with pytest.raises(UnicodeEncodeError): |
---|
933 | self.host.mkdir(path) |
---|
934 | |
---|
935 | def test_list_a_option(self): |
---|
936 | # For this test to pass, the server must _not_ list "hidden" |
---|
937 | # files by default but instead only when the `LIST` `-a` |
---|
938 | # option is used. |
---|
939 | host = self.host |
---|
940 | assert not host.use_list_a_option |
---|
941 | directory_entries = host.listdir(host.curdir) |
---|
942 | assert ".hidden" not in directory_entries |
---|
943 | # Switch on showing of hidden paths. |
---|
944 | host.use_list_a_option = True |
---|
945 | directory_entries = host.listdir(host.curdir) |
---|
946 | assert ".hidden" in directory_entries |
---|
947 | |
---|
948 | def _make_objects_to_be_garbage_collected(self): |
---|
949 | for _ in range(10): |
---|
950 | with ftputil.FTPHost(*self.login_data) as host: |
---|
951 | for _ in range(10): |
---|
952 | unused_stat_result = host.stat("CONTENTS") |
---|
953 | with host.open("CONTENTS") as fobj: |
---|
954 | unused_data = fobj.read() |
---|
955 | |
---|
956 | def test_garbage_collection(self): |
---|
957 | """Test whether there are cycles which prevent garbage collection.""" |
---|
958 | gc.collect() |
---|
959 | objects_before_test = len(gc.garbage) |
---|
960 | self._make_objects_to_be_garbage_collected() |
---|
961 | gc.collect() |
---|
962 | objects_after_test = len(gc.garbage) |
---|
963 | assert not objects_after_test - objects_before_test |
---|