| 1 | |
|---|
| 2 | |
|---|
| 3 | |
|---|
| 4 | import os |
|---|
| 5 | import subprocess |
|---|
| 6 | import unittest |
|---|
| 7 | |
|---|
| 8 | import ftputil |
|---|
| 9 | |
|---|
| 10 | |
|---|
| 11 | def email_address(): |
|---|
| 12 | """ |
|---|
| 13 | Return the email address used to identify the client to an |
|---|
| 14 | FTP server. |
|---|
| 15 | |
|---|
| 16 | If the hostname is "warpy", use my (Stefan's) email address, |
|---|
| 17 | else try to use the content of the $EMAIL environment variable. |
|---|
| 18 | If that doesn't exist, use a dummy address. |
|---|
| 19 | """ |
|---|
| 20 | try: |
|---|
| 21 | fobj = open("/etc/hostname") |
|---|
| 22 | hostname = fobj.read().strip() |
|---|
| 23 | finally: |
|---|
| 24 | fobj.close() |
|---|
| 25 | if hostname == "warpy": |
|---|
| 26 | email = "sschwarzer@sschwarzer.net" |
|---|
| 27 | else: |
|---|
| 28 | dummy_address = "anonymous@example.com" |
|---|
| 29 | email = os.environ.get("EMAIL", dummy_address) |
|---|
| 30 | if not email: |
|---|
| 31 | |
|---|
| 32 | email = dummy_address |
|---|
| 33 | return email |
|---|
| 34 | |
|---|
| 35 | EMAIL = email_address() |
|---|
| 36 | |
|---|
| 37 | |
|---|
| 38 | def ftp_client_listing(server, directory): |
|---|
| 39 | """ |
|---|
| 40 | Log into the FTP server `server` using the command line |
|---|
| 41 | client, then change to the `directory` and retrieve a |
|---|
| 42 | listing with "dir". Return the list of items found as the |
|---|
| 43 | an `os.listdir` would return it. |
|---|
| 44 | """ |
|---|
| 45 | |
|---|
| 46 | ftp_popen = subprocess.Popen(["ftp", "-n", server], |
|---|
| 47 | stdin=subprocess.PIPE, |
|---|
| 48 | stdout=subprocess.PIPE, |
|---|
| 49 | universal_newlines=True) |
|---|
| 50 | commands = ["user anonymous %s" % EMAIL, "dir", "bye"] |
|---|
| 51 | if directory: |
|---|
| 52 | |
|---|
| 53 | commands.insert(1, "cd %s" % directory) |
|---|
| 54 | input_ = "\n".join(commands) |
|---|
| 55 | stdout, stderr = ftp_popen.communicate(input_) |
|---|
| 56 | |
|---|
| 57 | names = [] |
|---|
| 58 | for line in stdout.strip().split("\n"): |
|---|
| 59 | if line.startswith("total "): |
|---|
| 60 | continue |
|---|
| 61 | parts = line.split() |
|---|
| 62 | if parts[-2] == "->": |
|---|
| 63 | |
|---|
| 64 | name = parts[-3] |
|---|
| 65 | else: |
|---|
| 66 | name = parts[-1] |
|---|
| 67 | names.append(name) |
|---|
| 68 | |
|---|
| 69 | names = [name for name in names if name not in (".", "..")] |
|---|
| 70 | return names |
|---|
| 71 | |
|---|
| 72 | |
|---|
| 73 | class TestPublicServers(unittest.TestCase): |
|---|
| 74 | """ |
|---|
| 75 | Get directory listings from various public FTP servers |
|---|
| 76 | with a command line client and ftputil and compare both. |
|---|
| 77 | |
|---|
| 78 | An important aspect is to test different "spellings" of |
|---|
| 79 | the same directory. For example, to list the root directory |
|---|
| 80 | which is usually set after login, use "" (nothing), ".", |
|---|
| 81 | "/", "/.", "./.", "././", "..", "../.", "../.." etc. |
|---|
| 82 | |
|---|
| 83 | The command line client "ftp" has to be in the path. |
|---|
| 84 | """ |
|---|
| 85 | |
|---|
| 86 | |
|---|
| 87 | |
|---|
| 88 | |
|---|
| 89 | |
|---|
| 90 | |
|---|
| 91 | |
|---|
| 92 | |
|---|
| 93 | |
|---|
| 94 | |
|---|
| 95 | |
|---|
| 96 | |
|---|
| 97 | servers = [ |
|---|
| 98 | ("ftp.gnome.org", "pub"), |
|---|
| 99 | ("ftp.debian.org", "debian"), |
|---|
| 100 | ("ftp.sunfreeware.com", "pub"), |
|---|
| 101 | ("ftp.chello.nl", "pub"), |
|---|
| 102 | ("ftp.heanet.ie", "pub"), |
|---|
| 103 | |
|---|
| 104 | ("ftp.microsoft.com", "deskapps")] |
|---|
| 105 | |
|---|
| 106 | |
|---|
| 107 | |
|---|
| 108 | |
|---|
| 109 | |
|---|
| 110 | |
|---|
| 111 | |
|---|
| 112 | paths_table = [ |
|---|
| 113 | (".", ["", ".", "/", "/.", "./.", "././", "..", "../.", "../..", |
|---|
| 114 | "DIR/..", "/DIR/../.", "/DIR/../.."]), |
|---|
| 115 | ("DIR", ["", ".", "/DIR", "/DIR/", "../DIR", "../../DIR"]) |
|---|
| 116 | ] |
|---|
| 117 | |
|---|
| 118 | def inner_test_server(self, server, initial_directory, paths): |
|---|
| 119 | """ |
|---|
| 120 | Test one server for one initial directory. |
|---|
| 121 | |
|---|
| 122 | Connect to the server `server`; if the string argument |
|---|
| 123 | `initial_directory` has a true value, change to this |
|---|
| 124 | directory. Then iterate over all strings in the sequence |
|---|
| 125 | `paths`, comparing the results of a listdir call with the |
|---|
| 126 | listing from the command line client. |
|---|
| 127 | """ |
|---|
| 128 | canonical_names = ftp_client_listing(server, initial_directory) |
|---|
| 129 | host = ftputil.FTPHost(server, "anonymous", EMAIL) |
|---|
| 130 | try: |
|---|
| 131 | host.chdir(initial_directory) |
|---|
| 132 | for path in paths: |
|---|
| 133 | path = path.replace("DIR", initial_directory) |
|---|
| 134 | |
|---|
| 135 | |
|---|
| 136 | |
|---|
| 137 | host.stat_cache.clear() |
|---|
| 138 | names = host.listdir(path) |
|---|
| 139 | failure_message = "For server %s, directory %s: %s != %s" % \ |
|---|
| 140 | (server, initial_directory, names, |
|---|
| 141 | canonical_names) |
|---|
| 142 | self.assertEqual(names, canonical_names) |
|---|
| 143 | finally: |
|---|
| 144 | host.close() |
|---|
| 145 | |
|---|
| 146 | def test_servers(self): |
|---|
| 147 | """ |
|---|
| 148 | Test all servers in `self.servers`. |
|---|
| 149 | |
|---|
| 150 | For each server, get the listings for the login directory and |
|---|
| 151 | one other directory which is known to exist. Use different |
|---|
| 152 | "spellings" to retrieve each list via ftputil and compare with |
|---|
| 153 | the results gotten with the command line client. |
|---|
| 154 | """ |
|---|
| 155 | for server, actual_initial_directory in self.servers: |
|---|
| 156 | for initial_directory, paths in self.paths_table: |
|---|
| 157 | initial_directory = initial_directory.replace( |
|---|
| 158 | "DIR", actual_initial_directory) |
|---|
| 159 | print server, initial_directory |
|---|
| 160 | self.inner_test_server(server, initial_directory, paths) |
|---|
| 161 | |
|---|
| 162 | |
|---|
| 163 | if __name__ == '__main__': |
|---|
| 164 | unittest.main() |
|---|