| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  | #!/usr/bin/env python | 
					
						
							| 
									
										
										
										
											2016-03-23 22:24:52 +08:00
										 |  |  | # coding: utf-8 | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  | from __future__ import unicode_literals | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Allow direct execution | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import unittest | 
					
						
							|  |  |  | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from youtube_dl import YoutubeDL | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  | from youtube_dl.compat import compat_http_server, compat_urllib_request | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  | import ssl | 
					
						
							|  |  |  | import threading | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-30 03:06:40 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-05-26 17:24:40 +08:00
										 |  |  | def http_server_port(httpd): | 
					
						
							| 
									
										
										
										
											2016-05-29 19:24:28 +08:00
										 |  |  |     if os.name == 'java' and isinstance(httpd.socket, ssl.SSLSocket): | 
					
						
							| 
									
										
										
										
											2016-05-26 17:24:40 +08:00
										 |  |  |         # In Jython SSLSocket is not a subclass of socket.socket | 
					
						
							|  |  |  |         sock = httpd.socket.sock | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         sock = httpd.socket | 
					
						
							|  |  |  |     return sock.getsockname()[1] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  | class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): | 
					
						
							|  |  |  |     def log_message(self, format, *args): | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def do_GET(self): | 
					
						
							|  |  |  |         if self.path == '/video.html': | 
					
						
							|  |  |  |             self.send_response(200) | 
					
						
							|  |  |  |             self.send_header('Content-Type', 'text/html; charset=utf-8') | 
					
						
							|  |  |  |             self.end_headers() | 
					
						
							|  |  |  |             self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | 
					
						
							|  |  |  |         elif self.path == '/vid.mp4': | 
					
						
							|  |  |  |             self.send_response(200) | 
					
						
							|  |  |  |             self.send_header('Content-Type', 'video/mp4') | 
					
						
							|  |  |  |             self.end_headers() | 
					
						
							|  |  |  |             self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') | 
					
						
							| 
									
										
										
										
											2016-05-26 17:24:40 +08:00
										 |  |  |         elif self.path == '/302': | 
					
						
							|  |  |  |             if sys.version_info[0] == 3: | 
					
						
							|  |  |  |                 # XXX: Python 3 http server does not allow non-ASCII header values | 
					
						
							|  |  |  |                 self.send_response(404) | 
					
						
							|  |  |  |                 self.end_headers() | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             new_url = 'http://localhost:%d/中文.html' % http_server_port(self.server) | 
					
						
							|  |  |  |             self.send_response(302) | 
					
						
							|  |  |  |             self.send_header(b'Location', new_url.encode('utf-8')) | 
					
						
							|  |  |  |             self.end_headers() | 
					
						
							|  |  |  |         elif self.path == '/%E4%B8%AD%E6%96%87.html': | 
					
						
							|  |  |  |             self.send_response(200) | 
					
						
							|  |  |  |             self.send_header('Content-Type', 'text/html; charset=utf-8') | 
					
						
							|  |  |  |             self.end_headers() | 
					
						
							|  |  |  |             self.wfile.write(b'<html><video src="/vid.mp4" /></html>') | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  |         else: | 
					
						
							|  |  |  |             assert False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class FakeLogger(object): | 
					
						
							|  |  |  |     def debug(self, msg): | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def warning(self, msg): | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def error(self, msg): | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestHTTP(unittest.TestCase): | 
					
						
							| 
									
										
										
										
											2016-05-26 17:24:40 +08:00
										 |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self.httpd = compat_http_server.HTTPServer( | 
					
						
							|  |  |  |             ('localhost', 0), HTTPTestRequestHandler) | 
					
						
							|  |  |  |         self.port = http_server_port(self.httpd) | 
					
						
							|  |  |  |         self.server_thread = threading.Thread(target=self.httpd.serve_forever) | 
					
						
							|  |  |  |         self.server_thread.daemon = True | 
					
						
							|  |  |  |         self.server_thread.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_unicode_path_redirection(self): | 
					
						
							|  |  |  |         # XXX: Python 3 http server does not allow non-ASCII header values | 
					
						
							|  |  |  |         if sys.version_info[0] == 3: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ydl = YoutubeDL({'logger': FakeLogger()}) | 
					
						
							|  |  |  |         r = ydl.extract_info('http://localhost:%d/302' % self.port) | 
					
						
							| 
									
										
										
										
											2016-10-12 01:41:41 +08:00
										 |  |  |         self.assertEqual(r['entries'][0]['url'], 'http://localhost:%d/vid.mp4' % self.port) | 
					
						
							| 
									
										
										
										
											2016-05-26 17:24:40 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestHTTPS(unittest.TestCase): | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  |     def setUp(self): | 
					
						
							|  |  |  |         certfn = os.path.join(TEST_DIR, 'testcert.pem') | 
					
						
							|  |  |  |         self.httpd = compat_http_server.HTTPServer( | 
					
						
							|  |  |  |             ('localhost', 0), HTTPTestRequestHandler) | 
					
						
							|  |  |  |         self.httpd.socket = ssl.wrap_socket( | 
					
						
							|  |  |  |             self.httpd.socket, certfile=certfn, server_side=True) | 
					
						
							| 
									
										
										
										
											2016-05-26 17:24:40 +08:00
										 |  |  |         self.port = http_server_port(self.httpd) | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  |         self.server_thread = threading.Thread(target=self.httpd.serve_forever) | 
					
						
							|  |  |  |         self.server_thread.daemon = True | 
					
						
							|  |  |  |         self.server_thread.start() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_nocheckcertificate(self): | 
					
						
							|  |  |  |         if sys.version_info >= (2, 7, 9):  # No certificate checking anyways | 
					
						
							|  |  |  |             ydl = YoutubeDL({'logger': FakeLogger()}) | 
					
						
							|  |  |  |             self.assertRaises( | 
					
						
							|  |  |  |                 Exception, | 
					
						
							|  |  |  |                 ydl.extract_info, 'https://localhost:%d/video.html' % self.port) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) | 
					
						
							|  |  |  |         r = ydl.extract_info('https://localhost:%d/video.html' % self.port) | 
					
						
							| 
									
										
										
										
											2016-10-12 01:41:41 +08:00
										 |  |  |         self.assertEqual(r['entries'][0]['url'], 'https://localhost:%d/vid.mp4' % self.port) | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | def _build_proxy_handler(name): | 
					
						
							|  |  |  |     class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): | 
					
						
							|  |  |  |         proxy_name = name | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def log_message(self, format, *args): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def do_GET(self): | 
					
						
							|  |  |  |             self.send_response(200) | 
					
						
							|  |  |  |             self.send_header('Content-Type', 'text/plain; charset=utf-8') | 
					
						
							|  |  |  |             self.end_headers() | 
					
						
							|  |  |  |             self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8')) | 
					
						
							|  |  |  |     return HTTPTestRequestHandler | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestProxy(unittest.TestCase): | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         self.proxy = compat_http_server.HTTPServer( | 
					
						
							|  |  |  |             ('localhost', 0), _build_proxy_handler('normal')) | 
					
						
							| 
									
										
										
										
											2016-05-26 17:24:40 +08:00
										 |  |  |         self.port = http_server_port(self.proxy) | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  |         self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) | 
					
						
							|  |  |  |         self.proxy_thread.daemon = True | 
					
						
							|  |  |  |         self.proxy_thread.start() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-07-03 23:50:55 +08:00
										 |  |  |         self.geo_proxy = compat_http_server.HTTPServer( | 
					
						
							|  |  |  |             ('localhost', 0), _build_proxy_handler('geo')) | 
					
						
							|  |  |  |         self.geo_port = http_server_port(self.geo_proxy) | 
					
						
							|  |  |  |         self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) | 
					
						
							|  |  |  |         self.geo_proxy_thread.daemon = True | 
					
						
							|  |  |  |         self.geo_proxy_thread.start() | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_proxy(self): | 
					
						
							| 
									
										
										
										
											2016-07-03 23:50:55 +08:00
										 |  |  |         geo_proxy = 'localhost:{0}'.format(self.geo_port) | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  |         ydl = YoutubeDL({ | 
					
						
							|  |  |  |             'proxy': 'localhost:{0}'.format(self.port), | 
					
						
							| 
									
										
										
										
											2016-07-03 23:50:55 +08:00
										 |  |  |             'geo_verification_proxy': geo_proxy, | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  |         }) | 
					
						
							|  |  |  |         url = 'http://foo.com/bar' | 
					
						
							|  |  |  |         response = ydl.urlopen(url).read().decode('utf-8') | 
					
						
							|  |  |  |         self.assertEqual(response, 'normal: {0}'.format(url)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         req = compat_urllib_request.Request(url) | 
					
						
							| 
									
										
										
										
											2016-07-03 23:50:55 +08:00
										 |  |  |         req.add_header('Ytdl-request-proxy', geo_proxy) | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  |         response = ydl.urlopen(req).read().decode('utf-8') | 
					
						
							| 
									
										
										
										
											2016-07-03 23:50:55 +08:00
										 |  |  |         self.assertEqual(response, 'geo: {0}'.format(url)) | 
					
						
							| 
									
										
										
										
											2015-03-20 14:59:38 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-23 22:24:52 +08:00
										 |  |  |     def test_proxy_with_idn(self): | 
					
						
							|  |  |  |         ydl = YoutubeDL({ | 
					
						
							|  |  |  |             'proxy': 'localhost:{0}'.format(self.port), | 
					
						
							|  |  |  |         }) | 
					
						
							|  |  |  |         url = 'http://中文.tw/' | 
					
						
							|  |  |  |         response = ydl.urlopen(url).read().decode('utf-8') | 
					
						
							|  |  |  |         # b'xn--fiq228c' is '中文'.encode('idna') | 
					
						
							|  |  |  |         self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-11-17 19:42:56 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-30 02:57:37 +01:00
										 |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     unittest.main() |