Changeset 1806:6a898515802d
- Timestamp:
- Jun 17, 2019, 9:57:33 PM (20 months ago)
- Branch:
- default
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
test/test_host.py
r1805 r1806 781 781 """ 782 782 783 def setup_method(self, method):784 self.host = test_base.ftp_host_factory()785 786 783 def test_upload(self): 787 784 """Test whether `upload` accepts either unicode or bytes.""" 788 host = self.host 785 Call = scripted_session.Call 786 host_script = [ 787 Call("__init__"), 788 Call(method_name="pwd", result="/"), 789 Call(method_name="close"), 790 ] 791 file_script = [ 792 Call("__init__"), 793 Call(method_name="pwd", result="/"), 794 Call(method_name="cwd", result=None, args=("/",)), 795 Call(method_name="voidcmd", result=None, args=("TYPE I",)), 796 Call(method_name="transfercmd", 797 result=io.BytesIO(), 798 args=("STOR target", None)), 799 Call(method_name="voidresp", result=None, args=()), 800 Call(method_name="close"), 801 ] 802 multisession_factory = scripted_session.factory(host_script, file_script) 789 803 # The source file needs to be present in the current directory. 790 host.upload("Makefile", "target") 791 host.upload("Makefile", ftputil.tool.as_bytes("target")) 804 with test_base.ftp_host_factory(multisession_factory) as host: 805 host.upload("Makefile", "target") 806 # Create new `BytesIO` object. 807 file_script[4] = Call(method_name="transfercmd", 808 result=io.BytesIO(), 809 args=("STOR target", None)) 810 multisession_factory = scripted_session.factory(host_script, file_script) 811 with test_base.ftp_host_factory(multisession_factory) as host: 812 host.upload("Makefile", ftputil.tool.as_bytes("target")) 792 813 793 814 def test_download(self): 794 815 """Test whether `download` accepts either unicode or bytes.""" 795 host = test_base.ftp_host_factory( 796 session_factory=BinaryDownloadMockSession) 816 Call = scripted_session.Call 817 host_script = [ 818 Call("__init__"), 819 Call(method_name="pwd", result="/"), 820 Call(method_name="close"), 821 ] 822 file_script = [ 823 Call("__init__"), 824 Call(method_name="pwd", result="/"), 825 Call(method_name="cwd", result=None, args=("/",)), 826 Call(method_name="voidcmd", result=None, args=("TYPE I",)), 827 Call(method_name="transfercmd", 828 result=io.BytesIO(), 829 args=("RETR source", None)), 830 Call(method_name="voidresp", result=None, args=()), 831 Call(method_name="close"), 832 ] 797 833 local_file_name = "_local_target_" 798 host.download("source", local_file_name) 799 host.download(ftputil.tool.as_bytes("source"), local_file_name) 834 multisession_factory = scripted_session.factory(host_script, file_script) 835 # The source file needs to be present in the current directory. 836 with test_base.ftp_host_factory(multisession_factory) as host: 837 host.download("source", local_file_name) 838 # Create new `BytesIO` object. 839 file_script[4] = Call(method_name="transfercmd", 840 result=io.BytesIO(), 841 args=("RETR source", None)) 842 multisession_factory = scripted_session.factory(host_script, file_script) 843 with test_base.ftp_host_factory(multisession_factory) as host: 844 host.download(ftputil.tool.as_bytes("source"), local_file_name) 800 845 os.remove(local_file_name) 801 846 802 847 def test_rename(self): 803 848 """Test whether `rename` accepts either unicode or bytes.""" 849 Call = scripted_session.Call 850 script = [ 851 Call("__init__"), 852 Call(method_name="pwd", result="/"), 853 Call(method_name="cwd", result=None, args=("/",)), 854 Call(method_name="rename", result=None, args=("/ä", "/ä")), 855 Call(method_name="close"), 856 ] 804 857 # It's possible to mix argument types, as for `os.rename`. 805 path_as_unicode = "/ home/file_name_test/ä"858 path_as_unicode = "/ä" 806 859 path_as_bytes = ftputil.tool.as_bytes(path_as_unicode) 807 860 paths = [path_as_unicode, path_as_bytes] 808 861 for source_path, target_path in itertools.product(paths, paths): 809 self.host.rename(source_path, target_path) 862 session_factory = scripted_session.factory(script) 863 with test_base.ftp_host_factory(session_factory) as host: 864 host.rename(source_path, target_path) 810 865 811 866 def test_listdir(self): 812 867 """Test whether `listdir` accepts either unicode or bytes.""" 813 host = self.host814 868 as_bytes = ftputil.tool.as_bytes 815 host.chdir("/home/file_name_test") 869 Call = scripted_session.Call 870 top_level_dir_line = test_base.dir_line(mode_string="drwxr-xr-x", 871 date_=datetime.date.today(), 872 name="ä") 873 dir_line1 = test_base.dir_line(mode_string="-rw-r--r--", 874 date_=datetime.date.today(), 875 name="ö") 876 dir_line2 = test_base.dir_line(mode_string="-rw-r--r--", 877 date_=datetime.date.today(), 878 name="o") 879 dir_result = dir_line1 + "\n" + dir_line2 880 script = [ 881 Call("__init__"), 882 Call(method_name="pwd", result="/"), 883 Call(method_name="cwd", result=None, args=("/",)), 884 Call(method_name="cwd", result=None, args=("/",)), 885 Call(method_name="dir", result=top_level_dir_line, args=("",)), 886 Call(method_name="cwd", result=None, args=("/",)), 887 Call(method_name="cwd", result=None, args=("/",)), 888 Call(method_name="cwd", result=None, args=("/ä",)), 889 Call(method_name="dir", result=dir_result, args=("",)), 890 Call(method_name="cwd", result=None, args=("/",)), 891 Call(method_name="close"), 892 ] 816 893 # Unicode 817 items = host.listdir("ä") 894 session_factory = scripted_session.factory(script) 895 with test_base.ftp_host_factory(session_factory) as host: 896 items = host.listdir("ä") 818 897 assert items == ["ö", "o"] 819 898 # Bytes 820 items = host.listdir(as_bytes("ä")) 821 assert items == [as_bytes("ö"), as_bytes("o")] 899 session_factory = scripted_session.factory(script) 900 with test_base.ftp_host_factory(session_factory) as host: 901 items = host.listdir(as_bytes("ä")) 902 assert items == [as_bytes("ö"), as_bytes("o")] 822 903 823 904 def test_chmod(self): 824 905 """Test whether `chmod` accepts either unicode or bytes.""" 825 host = self.host 826 # The `voidcmd` implementation in `MockSession` would raise an 827 # exception for the `CHMOD` command. 828 host._session.voidcmd = host._session._ignore_arguments 829 path = "/home/file_name_test/ä" 830 host.chmod(path, 0o755) 831 host.chmod(ftputil.tool.as_bytes(path), 0o755) 832 833 def _test_method_with_single_path_argument(self, method, path): 834 method(path) 835 method(ftputil.tool.as_bytes(path)) 906 Call = scripted_session.Call 907 script = [ 908 Call("__init__"), 909 Call(method_name="pwd", result="/"), 910 Call(method_name="cwd", result=None, args=("/",)), 911 Call(method_name="cwd", result=None, args=("/",)), 912 Call(method_name="voidcmd", result=None, args=("SITE CHMOD 0755 ä",)), 913 Call(method_name="cwd", result=None, args=("/",)), 914 Call(method_name="close"), 915 ] 916 path = "/ä" 917 # Unicode 918 session_factory = scripted_session.factory(script) 919 with test_base.ftp_host_factory(session_factory) as host: 920 host.chmod(path, 0o755) 921 # Bytes 922 session_factory = scripted_session.factory(script) 923 with test_base.ftp_host_factory(session_factory) as host: 924 host.chmod(ftputil.tool.as_bytes(path), 0o755) 925 926 def _test_method_with_single_path_argument(self, method_name, path, script): 927 # Unicode 928 session_factory = scripted_session.factory(script) 929 with test_base.ftp_host_factory(session_factory) as host: 930 method = getattr(host, method_name) 931 method(path) 932 # Bytes 933 session_factory = scripted_session.factory(script) 934 with test_base.ftp_host_factory(session_factory) as host: 935 method = getattr(host, method_name) 936 method(ftputil.tool.as_bytes(path)) 836 937 837 938 def test_chdir(self): 838 939 """Test whether `chdir` accepts either unicode or bytes.""" 839 self._test_method_with_single_path_argument( 840 self.host.chdir, "/home/file_name_test/ö") 940 Call = scripted_session.Call 941 script = [ 942 Call("__init__"), 943 Call(method_name="pwd", result="/"), 944 Call(method_name="cwd", result=None, args=("/ö",)), 945 Call(method_name="close"), 946 ] 947 self._test_method_with_single_path_argument("chdir", "/ö", script) 841 948 842 949 def test_mkdir(self): 843 950 """Test whether `mkdir` accepts either unicode or bytes.""" 844 # This directory exists already in the mock session, but this 845 # shouldn't matter for the test. 846 self._test_method_with_single_path_argument( 847 self.host.mkdir, "/home/file_name_test/ä") 951 Call = scripted_session.Call 952 script = [ 953 Call("__init__"), 954 Call(method_name="pwd", result="/"), 955 Call(method_name="cwd", result=None, args=("/",)), 956 Call(method_name="cwd", result=None, args=("/",)), 957 Call(method_name="mkd", result=None, args=("ä",)), 958 Call(method_name="cwd", result=None, args=("/",)), 959 Call(method_name="close"), 960 ] 961 self._test_method_with_single_path_argument("mkdir", "/ä", script) 848 962 849 963 def test_makedirs(self): 850 964 """Test whether `makedirs` accepts either unicode or bytes.""" 851 self._test_method_with_single_path_argument( 852 self.host.makedirs, "/home/file_name_test/ä") 965 Call = scripted_session.Call 966 script = [ 967 Call("__init__"), 968 Call(method_name="pwd", result="/"), 969 # To deal with ticket #86 (virtual directories), `makedirs` tries to 970 # change into each directory and if it exists (changing doesn't raise 971 # an exception), doesn't try to create it. That's why you don't see 972 # an `mkd` calls here despite originally having a `makedirs` call. 973 Call(method_name="cwd", result=None, args=("/ä",)), 974 Call(method_name="cwd", result=None, args=("/ä/ö",)), 975 Call(method_name="cwd", result=None, args=("/",)), 976 Call(method_name="close"), 977 ] 978 self._test_method_with_single_path_argument("makedirs", "/ä/ö", script) 853 979 854 980 def test_rmdir(self): 855 981 """Test whether `rmdir` accepts either unicode or bytes.""" 856 empty_directory_as_required_by_rmdir = "/home/file_name_test/empty_ä" 982 Call = scripted_session.Call 983 dir_line = test_base.dir_line(mode_string="drwxr-xr-x", 984 date_=datetime.date.today(), 985 name="empty_ä") 986 # Since the session script isn't at all obvious, I checked it with a 987 # debugger and added comments on some of the calls that happen during 988 # the `rmdir` call. 989 # 990 # `_robust_ftp_command` descends one directory at a time (see ticket 991 # #11) and restores the original directory in the end, which results in 992 # at least four calls on the FTP session object (`cwd`, `cwd`, actual 993 # method, `cwd`). It would be great if all the roundtrips to the server 994 # could be reduced. 995 script = [ 996 # `FTPHost` initialization 997 Call("__init__"), 998 Call(method_name="pwd", result="/"), 999 # `host.rmdir("/empty_ä")` 1000 # `host.listdir("/empty_ä")` 1001 # `host._stat._listdir("/empty_ä")` 1002 # `host._stat.__call_with_parser_retry("/empty_ä")` 1003 # `host._stat._real_listdir("/empty_ä")` 1004 # `host.path.isdir("/empty_ä")` 1005 Call(method_name="cwd", result=None, args=("/",)), 1006 Call(method_name="cwd", result=None, args=("/",)), 1007 Call(method_name="dir", result=dir_line, args=("",)), 1008 Call(method_name="cwd", result=None, args=("/",)), 1009 # `host.path.isdir` end 1010 # `host._stat._stat_results_from_dir("/empty_ä")` 1011 Call(method_name="cwd", result=None, args=("/",)), 1012 Call(method_name="cwd", result=None, args=("/empty_ä",)), 1013 Call(method_name="dir", result="", args=("",)), 1014 Call(method_name="cwd", result=None, args=("/",)), 1015 # `host._stat._stat_results_from_dir("/empty_ä")` end 1016 # `host._session.rmd` in `host._robust_ftp_command` 1017 # `host._check_inaccessible_login_directory()` 1018 Call(method_name="cwd", result=None, args=("/",)), 1019 # `host.chdir(head)` ("/") 1020 Call(method_name="cwd", result=None, args=("/",)), 1021 # `host.rmd(tail)` ("empty_ä") 1022 Call(method_name="rmd", result=None, args=("empty_ä",)), 1023 # `host.chdir(old_dir)` ("/") 1024 Call(method_name="cwd", result=None, args=("/",)), 1025 # 1026 Call(method_name="close") 1027 ] 1028 empty_directory_as_required_by_rmdir = "/empty_ä" 857 1029 self._test_method_with_single_path_argument( 858 self.host.rmdir, empty_directory_as_required_by_rmdir)1030 "rmdir", empty_directory_as_required_by_rmdir, script) 859 1031 860 1032 def test_remove(self): 861 1033 """Test whether `remove` accepts either unicode or bytes.""" 862 self._test_method_with_single_path_argument( 863 self.host.remove, "/home/file_name_test/ö") 1034 Call = scripted_session.Call 1035 dir_line = test_base.dir_line(mode_string="-rw-r--r--", 1036 date_=datetime.date.today(), 1037 name="ö") 1038 script = [ 1039 Call("__init__"), 1040 Call(method_name="pwd", result="/"), 1041 Call(method_name="cwd", result=None, args=("/",)), 1042 Call(method_name="cwd", result=None, args=("/",)), 1043 Call(method_name="dir", result=dir_line, args=("",)), 1044 Call(method_name="cwd", result=None, args=("/",)), 1045 Call(method_name="cwd", result=None, args=("/",)), 1046 Call(method_name="cwd", result=None, args=("/",)), 1047 Call(method_name="delete", result=None, args=("ö",)), 1048 Call(method_name="cwd", result=None, args=("/",)), 1049 Call(method_name="close"), 1050 ] 1051 self._test_method_with_single_path_argument("remove", "/ö", script) 864 1052 865 1053 def test_rmtree(self): 866 1054 """Test whether `rmtree` accepts either unicode or bytes.""" 867 empty_directory_as_required_by_rmtree = "/home/file_name_test/empty_ä" 1055 Call = scripted_session.Call 1056 dir_line = test_base.dir_line(mode_string="drwxr-xr-x", 1057 date_=datetime.date.today(), 1058 name="empty_ä") 1059 script = [ 1060 Call("__init__"), 1061 Call(method_name="pwd", result="/"), 1062 Call(method_name="cwd", result=None, args=("/",)), 1063 Call(method_name="cwd", result=None, args=("/",)), 1064 # Recursive `listdir` 1065 # Check parent (root) directory. 1066 Call(method_name="dir", result=dir_line, args=("",)), 1067 Call(method_name="cwd", result=None, args=("/",)), 1068 Call(method_name="cwd", result=None, args=("/",)), 1069 Call(method_name="cwd", result=None, args=("/empty_ä",)), 1070 # Child directory (inside `empty_ä`) 1071 Call(method_name="dir", result="", args=("",)), 1072 Call(method_name="cwd", result=None, args=("/",)), 1073 Call(method_name="cwd", result=None, args=("/",)), 1074 Call(method_name="cwd", result=None, args=("/empty_ä",)), 1075 # Recursive `rmdir` (repeated `cwd` calls because of 1076 # `_robust_ftp_command`) 1077 Call(method_name="dir", result="", args=("",)), 1078 Call(method_name="cwd", result=None, args=("/",)), 1079 Call(method_name="cwd", result=None, args=("/",)), 1080 Call(method_name="cwd", result=None, args=("/",)), 1081 Call(method_name="rmd", result=None, args=("empty_ä",)), 1082 Call(method_name="cwd", result=None, args=("/",)), 1083 Call(method_name="close"), 1084 ] 1085 empty_directory_as_required_by_rmtree = "/empty_ä" 868 1086 self._test_method_with_single_path_argument( 869 self.host.rmtree, empty_directory_as_required_by_rmtree)1087 "rmtree", empty_directory_as_required_by_rmtree, script) 870 1088 871 1089 def test_lstat(self): 872 1090 """Test whether `lstat` accepts either unicode or bytes.""" 873 self._test_method_with_single_path_argument( 874 self.host.lstat, "/home/file_name_test/ä") 1091 Call = scripted_session.Call 1092 dir_line = test_base.dir_line(mode_string="-rw-r--r--", 1093 date_=datetime.date.today(), 1094 name="ä") 1095 script = [ 1096 Call("__init__"), 1097 Call(method_name="pwd", result="/"), 1098 Call(method_name="cwd", result=None, args=("/",)), 1099 Call(method_name="cwd", result=None, args=("/",)), 1100 Call(method_name="dir", result=dir_line, args=("",)), 1101 Call(method_name="cwd", result=None, args=("/",)), 1102 Call(method_name="close"), 1103 ] 1104 self._test_method_with_single_path_argument("lstat", "/ä", script) 875 1105 876 1106 def test_stat(self): 877 1107 """Test whether `stat` accepts either unicode or bytes.""" 878 self._test_method_with_single_path_argument( 879 self.host.stat, "/home/file_name_test/ä") 1108 Call = scripted_session.Call 1109 dir_line = test_base.dir_line(mode_string="-rw-r--r--", 1110 date_=datetime.date.today(), 1111 name="ä") 1112 script = [ 1113 Call("__init__"), 1114 Call(method_name="pwd", result="/"), 1115 Call(method_name="cwd", result=None, args=("/",)), 1116 Call(method_name="cwd", result=None, args=("/",)), 1117 Call(method_name="dir", result=dir_line, args=("",)), 1118 Call(method_name="cwd", result=None, args=("/",)), 1119 Call(method_name="close"), 1120 ] 1121 self._test_method_with_single_path_argument("stat", "/ä", script) 880 1122 881 1123 def test_walk(self): 882 1124 """Test whether `walk` accepts either unicode or bytes.""" 1125 Call = scripted_session.Call 1126 dir_line = test_base.dir_line(mode_string="-rw-r--r--", 1127 date_=datetime.date.today(), 1128 name="ä") 1129 script = [ 1130 Call("__init__"), 1131 Call(method_name="pwd", result="/"), 1132 Call(method_name="cwd", result=None, args=("/",)), 1133 Call(method_name="cwd", result=None, args=("/",)), 1134 Call(method_name="dir", result=dir_line, args=("",)), 1135 Call(method_name="cwd", result=None, args=("/",)), 1136 Call(method_name="close"), 1137 ] 883 1138 # We're not interested in the return value of `walk`. 884 self._test_method_with_single_path_argument( 885 self.host.walk, "/home/file_name_test/ä") 1139 # Unicode 1140 session_factory = scripted_session.factory(script) 1141 with test_base.ftp_host_factory(session_factory) as host: 1142 result = list(host.walk("/ä")) 1143 # Bytes 1144 session_factory = scripted_session.factory(script) 1145 with test_base.ftp_host_factory(session_factory) as host: 1146 result = list(host.walk(ftputil.tool.as_bytes("/ä"))) 886 1147 887 1148
Note: See TracChangeset
for help on using the changeset viewer.