|
|
import os |
|
|
import requests |
|
|
from flask import Flask, Response, request, stream_with_context |
|
|
from urllib.parse import urlparse, urljoin, urlunparse |
|
|
import re |
|
|
import random |
|
|
import logging |
|
|
import json |
|
|
|
|
|
app = Flask(__name__) |
|
|
SECRET_NAME_FOR_TARGET_URL = "TARGET_HF_SPACE_URL" |
|
|
TARGET_URL_FROM_SECRET = os.environ.get(SECRET_NAME_FOR_TARGET_URL) |
|
|
REQUEST_TIMEOUT = 45 |
|
|
|
|
|
|
|
|
log = logging.getLogger('werkzeug') |
|
|
log.setLevel(logging.ERROR) |
|
|
|
|
|
|
|
|
if not (os.environ.get('FLASK_DEBUG', '0') == '1' or os.environ.get('FLASK_ENV') == 'development'): |
|
|
app.logger.setLevel(logging.CRITICAL + 1) |
|
|
else: |
|
|
app.logger.setLevel(logging.INFO) |
|
|
|
|
|
print(f"APP_STARTUP: TARGET_URL_FROM_SECRET: {'SET' if TARGET_URL_FROM_SECRET else 'NOT SET'}") |
|
|
if TARGET_URL_FROM_SECRET: |
|
|
print(f"APP_STARTUP: Target URL is '{TARGET_URL_FROM_SECRET[:20]}...'") |
|
|
|
|
|
COMMON_USER_AGENTS = [ |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36", |
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15", |
|
|
|
|
|
] |
|
|
|
|
|
SIMPLE_TEXT_REPLACEMENTS = { |
|
|
"Aya Vision": "تحلیل", |
|
|
"Chat with Aya": "چت🤖", |
|
|
"Visualize with Aya": "عکس", |
|
|
"Speak with Aya": "", |
|
|
|
|
|
"Gradio": "App", |
|
|
"Built with Gradio": "" |
|
|
} |
|
|
|
|
|
GRADIO_HIDE_CSS_STANDARD = """ |
|
|
<style> |
|
|
/* فوتر Gradio */ |
|
|
.gradio-container .meta-footer, .gradio-container footer, div[class*="footer"], footer, a[href*="gradio.app"], |
|
|
/* دکمه Settings Gradio */ |
|
|
.gradio-container button[id*="settings"], .gradio-container div[class*="settings-button"], |
|
|
a[href*="gradio.app/"], button[title*="Settings"], button[aria-label*="Settings"], div[data-testid*="settings"] { |
|
|
display: none !important; |
|
|
visibility: hidden !important; |
|
|
opacity: 0 !important; |
|
|
width: 0 !important; |
|
|
height: 0 !important; |
|
|
overflow: hidden !important; |
|
|
margin: 0 !important; |
|
|
padding: 0 !important; |
|
|
border: none !important; |
|
|
font-size: 0 !important; |
|
|
line-height: 0 !important; |
|
|
} |
|
|
/* کلاس برای مخفی کردن المانهایی که متن آنها با رشته خالی جایگزین شده */ |
|
|
.gr-proxy-item-hidden-by-text { |
|
|
display: none !important; |
|
|
visibility: hidden !important; |
|
|
} |
|
|
</style> |
|
|
""" |
|
|
|
|
|
def process_html_content_server_side(html_string): |
|
|
"""فقط حذفهای اولیه سمت سرور با Regex (بدون تزریق CSS در اینجا).""" |
|
|
processed_html = html_string |
|
|
|
|
|
return processed_html |
|
|
|
|
|
@app.route('/', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD']) |
|
|
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD']) |
|
|
def proxy_request_handler(path): |
|
|
if not TARGET_URL_FROM_SECRET: |
|
|
app.logger.critical(f"CRITICAL: Secret '{SECRET_NAME_FOR_TARGET_URL}' is not set.") |
|
|
return "Proxy Configuration Error: Target URL secret is not configured.", 500 |
|
|
|
|
|
base_target_url = TARGET_URL_FROM_SECRET.rstrip('/') |
|
|
|
|
|
target_full_url = urljoin(base_target_url + "/", path.lstrip('/')) |
|
|
if request.query_string: |
|
|
target_full_url += "?" + request.query_string.decode() |
|
|
|
|
|
app.logger.debug(f"Proxying request for path: /{path} to {target_full_url}") |
|
|
|
|
|
try: |
|
|
parsed_target_url_for_host = urlparse(TARGET_URL_FROM_SECRET) |
|
|
target_hostname = parsed_target_url_for_host.hostname |
|
|
user_agent_to_send = random.choice(COMMON_USER_AGENTS) |
|
|
|
|
|
excluded_incoming_headers = [ |
|
|
'host', 'cookie', 'connection', 'upgrade-insecure-requests', |
|
|
'if-none-match', 'if-modified-since', 'referer', 'x-hf-space-host', |
|
|
'content-length' |
|
|
] |
|
|
forward_headers = {key: value for key, value in request.headers.items() if key.lower() not in excluded_incoming_headers} |
|
|
forward_headers['User-Agent'] = user_agent_to_send |
|
|
forward_headers['Host'] = target_hostname |
|
|
forward_headers['Accept-Encoding'] = 'gzip, deflate, br' |
|
|
|
|
|
|
|
|
if request.headers.getlist("X-Forwarded-For"): |
|
|
forward_headers["X-Forwarded-For"] = request.headers.getlist("X-Forwarded-For")[0] + ", " + request.remote_addr |
|
|
else: |
|
|
forward_headers["X-Forwarded-For"] = request.remote_addr |
|
|
|
|
|
|
|
|
request_body = request.get_data() if request.method not in ['GET', 'HEAD', 'OPTIONS'] else None |
|
|
|
|
|
with requests.Session() as s: |
|
|
target_response = s.request( |
|
|
method=request.method, |
|
|
url=target_full_url, |
|
|
headers=forward_headers, |
|
|
data=request_body, |
|
|
stream=True, |
|
|
timeout=REQUEST_TIMEOUT, |
|
|
allow_redirects=False |
|
|
) |
|
|
|
|
|
|
|
|
if 300 <= target_response.status_code < 400 and 'Location' in target_response.headers: |
|
|
location = target_response.headers['Location'] |
|
|
|
|
|
|
|
|
parsed_location = urlparse(location) |
|
|
rewritten_location_header_val = location |
|
|
|
|
|
if not parsed_location.scheme or parsed_location.hostname == target_hostname: |
|
|
|
|
|
abs_target_redirect_url = urljoin(target_response.url, location) |
|
|
new_path_on_target = urlparse(abs_target_redirect_url).path |
|
|
new_query_on_target = urlparse(abs_target_redirect_url).query |
|
|
|
|
|
rewritten_location_path = new_path_on_target |
|
|
if new_query_on_target: |
|
|
rewritten_location_path += "?" + new_query_on_target |
|
|
|
|
|
if not rewritten_location_path.startswith('/') and rewritten_location_path: |
|
|
rewritten_location_path = '/' + rewritten_location_path |
|
|
rewritten_location_header_val = rewritten_location_path if rewritten_location_path else "/" |
|
|
|
|
|
final_redirect_headers = {'Location': rewritten_location_header_val} |
|
|
|
|
|
for h_key, h_val in target_response.headers.items(): |
|
|
if h_key.lower() in ['set-cookie', 'cache-control', 'expires', 'pragma', 'vary']: |
|
|
final_redirect_headers[h_key] = h_val |
|
|
return Response(response=None, status=target_response.status_code, headers=final_redirect_headers) |
|
|
|
|
|
|
|
|
if target_response.status_code >= 400: |
|
|
app.logger.warning(f"Target {target_full_url} error: {target_response.status_code}") |
|
|
error_content = target_response.content |
|
|
error_headers_from_target = { |
|
|
k:v for k,v in target_response.headers.items() |
|
|
if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length'] |
|
|
} |
|
|
return Response(response=error_content, status=target_response.status_code, headers=error_headers_from_target) |
|
|
|
|
|
content_type = target_response.headers.get('Content-Type', 'application/octet-stream') |
|
|
|
|
|
excluded_response_headers = [ |
|
|
'content-encoding', 'transfer-encoding', 'connection', 'keep-alive', |
|
|
'x-frame-options', 'strict-transport-security', 'public-key-pins', |
|
|
'content-length', 'server', 'x-powered-by', 'date', |
|
|
'link', |
|
|
'x-canonical-url' |
|
|
] |
|
|
final_response_headers = { |
|
|
key: value for key, value in target_response.headers.items() |
|
|
if key.lower() not in excluded_response_headers |
|
|
} |
|
|
final_response_headers['Content-Type'] = content_type |
|
|
final_response_headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' |
|
|
final_response_headers['Pragma'] = 'no-cache' |
|
|
final_response_headers['Expires'] = '0' |
|
|
final_response_headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' |
|
|
|
|
|
def generate_response_content_stream(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_html_content_type = 'text/html' in content_type.lower() |
|
|
base_tag_injected_flag = False |
|
|
css_and_js_injected_flag = False |
|
|
|
|
|
current_buffer = b'' |
|
|
charset_match = re.search(r'charset=([\w-]+)', content_type, re.IGNORECASE) |
|
|
html_processing_encoding = charset_match.group(1) if charset_match else 'utf-8' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
for chunk_data in target_response.iter_content(chunk_size=8192): |
|
|
if not chunk_data: continue |
|
|
|
|
|
if is_html_content_type: |
|
|
current_buffer += chunk_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
yield chunk_data |
|
|
|
|
|
else: |
|
|
yield chunk_data |
|
|
|
|
|
|
|
|
if current_buffer: |
|
|
if is_html_content_type: |
|
|
|
|
|
yield current_buffer |
|
|
else: |
|
|
yield current_buffer |
|
|
except Exception as stream_err: |
|
|
app.logger.error(f"Error during HTML/Stream processing for {target_full_url}: {stream_err}") |
|
|
finally: |
|
|
target_response.close() |
|
|
|
|
|
if request.method == 'HEAD': |
|
|
return Response(response=None, status=target_response.status_code, headers=final_response_headers) |
|
|
else: |
|
|
return Response(stream_with_context(generate_response_content_stream()), status=target_response.status_code, headers=final_response_headers) |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
app.logger.error(f"Timeout ({REQUEST_TIMEOUT}s) fetching {target_full_url}") |
|
|
return "Error: Request to target site timed out.", 504 |
|
|
except requests.exceptions.HTTPError as http_err: |
|
|
app.logger.error(f"HTTPError {http_err.response.status_code} from {target_full_url}. Resp: {http_err.response.text[:200]}") |
|
|
|
|
|
return "HTTP Error from target.", 502 |
|
|
except requests.exceptions.ConnectionError as conn_err: |
|
|
app.logger.error(f"ConnectionError for {target_full_url}: {conn_err}") |
|
|
return f"Error: Could not connect to target.", 502 |
|
|
except requests.exceptions.RequestException as req_err: |
|
|
app.logger.error(f"RequestException for {target_full_url}: {req_err}") |
|
|
return f"Error fetching content.", 502 |
|
|
except Exception as general_err: |
|
|
app.logger.exception(f"Unexpected Python error proxying {target_full_url}") |
|
|
return f"Unexpected server error.", 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
|
|
|
|
|
|
debug_mode = os.environ.get('FLASK_DEBUG', '0') == '1' |
|
|
|
|
|
print(f"INFO: Starting Flask app on host 0.0.0.0, port {port}, debug: {debug_mode}") |
|
|
app.run(host='0.0.0.0', port=port, debug=debug_mode) |