| 
									
										
										
										
											2016-04-23 21:30:44 +08:00
										 |  |  | #!/usr/bin/env python | 
					
						
							|  |  |  | # coding: utf-8 | 
					
						
							|  |  |  | from __future__ import unicode_literals | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Allow direct execution | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  | import random | 
					
						
							|  |  |  | import subprocess | 
					
						
							| 
									
										
										
										
											2016-04-23 21:30:44 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  | from test.helper import ( | 
					
						
							|  |  |  |     FakeYDL, | 
					
						
							|  |  |  |     get_params, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from youtube_dl.compat import ( | 
					
						
							|  |  |  |     compat_str, | 
					
						
							|  |  |  |     compat_urllib_request, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2016-04-23 21:30:44 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | class TestMultipleSocks(unittest.TestCase): | 
					
						
							| 
									
										
										
										
											2016-04-23 21:30:44 +08:00
										 |  |  |     @staticmethod | 
					
						
							|  |  |  |     def _check_params(attrs): | 
					
						
							|  |  |  |         params = get_params() | 
					
						
							|  |  |  |         for attr in attrs: | 
					
						
							|  |  |  |             if attr not in params: | 
					
						
							|  |  |  |                 print('Missing %s. Skipping.' % attr) | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  |         return params | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_proxy_http(self): | 
					
						
							|  |  |  |         params = self._check_params(['primary_proxy', 'primary_server_ip']) | 
					
						
							|  |  |  |         if params is None: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         ydl = FakeYDL({ | 
					
						
							|  |  |  |             'proxy': params['primary_proxy'] | 
					
						
							|  |  |  |         }) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             ydl.urlopen('http://yt-dl.org/ip').read().decode('utf-8'), | 
					
						
							|  |  |  |             params['primary_server_ip']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_proxy_https(self): | 
					
						
							|  |  |  |         params = self._check_params(['primary_proxy', 'primary_server_ip']) | 
					
						
							|  |  |  |         if params is None: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         ydl = FakeYDL({ | 
					
						
							|  |  |  |             'proxy': params['primary_proxy'] | 
					
						
							|  |  |  |         }) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             ydl.urlopen('https://yt-dl.org/ip').read().decode('utf-8'), | 
					
						
							|  |  |  |             params['primary_server_ip']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_secondary_proxy_http(self): | 
					
						
							|  |  |  |         params = self._check_params(['secondary_proxy', 'secondary_server_ip']) | 
					
						
							|  |  |  |         if params is None: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         ydl = FakeYDL() | 
					
						
							|  |  |  |         req = compat_urllib_request.Request('http://yt-dl.org/ip') | 
					
						
							|  |  |  |         req.add_header('Ytdl-request-proxy', params['secondary_proxy']) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             ydl.urlopen(req).read().decode('utf-8'), | 
					
						
							|  |  |  |             params['secondary_server_ip']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_secondary_proxy_https(self): | 
					
						
							|  |  |  |         params = self._check_params(['secondary_proxy', 'secondary_server_ip']) | 
					
						
							|  |  |  |         if params is None: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         ydl = FakeYDL() | 
					
						
							|  |  |  |         req = compat_urllib_request.Request('https://yt-dl.org/ip') | 
					
						
							|  |  |  |         req.add_header('Ytdl-request-proxy', params['secondary_proxy']) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             ydl.urlopen(req).read().decode('utf-8'), | 
					
						
							|  |  |  |             params['secondary_server_ip']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  | class TestSocks(unittest.TestCase): | 
					
						
							| 
									
										
										
										
											2016-05-14 18:48:36 +08:00
										 |  |  |     _SKIP_SOCKS_TEST = True | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  |     def setUp(self): | 
					
						
							| 
									
										
										
										
											2016-05-14 18:48:36 +08:00
										 |  |  |         if self._SKIP_SOCKS_TEST: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-08 15:16:32 +08:00
										 |  |  |         self.port = random.randint(20000, 30000) | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  |         self.server_process = subprocess.Popen([ | 
					
						
							|  |  |  |             'srelay', '-f', '-i', '127.0.0.1:%d' % self.port], | 
					
						
							|  |  |  |             stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							| 
									
										
										
										
											2016-05-14 18:48:36 +08:00
										 |  |  |         if self._SKIP_SOCKS_TEST: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  |         self.server_process.terminate() | 
					
						
							|  |  |  |         self.server_process.communicate() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _get_ip(self, protocol): | 
					
						
							| 
									
										
										
										
											2016-05-14 18:48:36 +08:00
										 |  |  |         if self._SKIP_SOCKS_TEST: | 
					
						
							|  |  |  |             return '127.0.0.1' | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-05 17:09:13 +08:00
										 |  |  |         ydl = FakeYDL({ | 
					
						
							|  |  |  |             'proxy': '%s://127.0.0.1:%d' % (protocol, self.port), | 
					
						
							|  |  |  |         }) | 
					
						
							|  |  |  |         return ydl.urlopen('http://yt-dl.org/ip').read().decode('utf-8') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_socks4(self): | 
					
						
							|  |  |  |         self.assertTrue(isinstance(self._get_ip('socks4'), compat_str)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_socks4a(self): | 
					
						
							|  |  |  |         self.assertTrue(isinstance(self._get_ip('socks4a'), compat_str)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_socks5(self): | 
					
						
							|  |  |  |         self.assertTrue(isinstance(self._get_ip('socks5'), compat_str)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-04-23 21:30:44 +08:00
										 |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     unittest.main() |