You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
672 lines
26 KiB
672 lines
26 KiB
import eventlet
|
|
eventlet.monkey_patch()
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from flask import Flask, render_template, request, jsonify, send_from_directory
|
|
from flask_socketio import SocketIO, emit
|
|
from flask_cors import CORS
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'tnCJ5Gs0pYCRH5XcEkevtz19eueB7gynqEW0cK8s2PYNYJWR9n1b9Be1VXmkwtidtvbHJbRVj9wVfqTCM5cpQLYARoo5EdG9siTL'
|
|
CORS(app)
|
|
socketio = SocketIO(app, async_mode='eventlet', cors_allowed_origins="*")
|
|
|
|
# 进程管理
|
|
processes = {
|
|
'send_frpc': None,
|
|
'send_p2p': None,
|
|
'receive_frpc': None,
|
|
'receive_p2p': None
|
|
}
|
|
|
|
is_running = {
|
|
'send': False,
|
|
'receive': False
|
|
}
|
|
|
|
# 允许访问的基础目录
|
|
ALLOWED_BASE_DIR = "/mnt/wwn-0x5000c500a34676a5-part1/docker/kodbox/data/files/"
|
|
|
|
def is_path_allowed(path):
|
|
"""
|
|
检查路径是否在允许访问的目录范围内
|
|
"""
|
|
# 规范化路径
|
|
normalized_path = os.path.abspath(os.path.normpath(path))
|
|
normalized_base = os.path.abspath(os.path.normpath(ALLOWED_BASE_DIR))
|
|
|
|
# 检查路径是否以基础目录开头
|
|
return normalized_path.startswith(normalized_base)
|
|
|
|
def get_safe_path(requested_path):
|
|
"""
|
|
获取安全的路径,如果不在允许范围内则返回基础目录
|
|
"""
|
|
if is_path_allowed(requested_path):
|
|
return requested_path
|
|
return ALLOWED_BASE_DIR
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
# 搜索文件路由 - 添加目录访问限制和支持*通配符
|
|
@app.route('/search/files', methods=['POST'])
|
|
def search_files():
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'status': 'error', 'message': '无效的JSON数据'})
|
|
|
|
requested_path = data.get('path', ALLOWED_BASE_DIR)
|
|
search_pattern = data.get('pattern', '').strip()
|
|
max_results = data.get('max_results', 100)
|
|
|
|
# 确保路径安全
|
|
search_path = get_safe_path(requested_path)
|
|
|
|
# 如果请求的路径是文件而不是目录,取其所在目录
|
|
if os.path.isfile(search_path):
|
|
search_path = os.path.dirname(search_path)
|
|
search_path = get_safe_path(search_path)
|
|
|
|
# 确保路径存在且是目录
|
|
if not os.path.exists(search_path):
|
|
search_path = ALLOWED_BASE_DIR
|
|
|
|
if not os.path.isdir(search_path):
|
|
search_path = os.path.dirname(search_path) if os.path.isfile(search_path) else ALLOWED_BASE_DIR
|
|
|
|
# 确保max_results是整数
|
|
try:
|
|
max_results = int(max_results)
|
|
except (ValueError, TypeError):
|
|
max_results = 100
|
|
|
|
if not search_pattern:
|
|
return jsonify({'status': 'error', 'message': '请输入搜索关键词'})
|
|
|
|
if not os.path.exists(search_path):
|
|
return jsonify({'status': 'error', 'message': '搜索路径不存在'})
|
|
|
|
# 检查搜索路径是否在允许范围内
|
|
if not is_path_allowed(search_path):
|
|
return jsonify({'status': 'error', 'message': '无权访问该搜索路径'})
|
|
|
|
results = []
|
|
search_count = 0
|
|
|
|
# 判断是否是通配符搜索(显示所有文件)
|
|
show_all_files = (search_pattern == '*')
|
|
|
|
if show_all_files:
|
|
# 显示当前目录下的所有文件(不递归)
|
|
try:
|
|
for item in os.listdir(search_path):
|
|
item_path = os.path.join(search_path, item)
|
|
|
|
# 检查路径是否允许访问
|
|
if not is_path_allowed(item_path):
|
|
continue
|
|
|
|
# 只处理文件,不处理目录
|
|
if os.path.isfile(item_path):
|
|
try:
|
|
file_size = os.path.getsize(item_path) if os.path.exists(item_path) else 0
|
|
|
|
results.append({
|
|
'name': item,
|
|
'path': item_path,
|
|
'directory': search_path,
|
|
'relative_path': item, # 相对路径就是文件名
|
|
'size': file_size
|
|
})
|
|
|
|
search_count += 1
|
|
if search_count >= max_results:
|
|
break
|
|
except (OSError, PermissionError):
|
|
# 跳过无法访问的文件
|
|
continue
|
|
except (OSError, PermissionError):
|
|
return jsonify({'status': 'error', 'message': '无法访问该目录'})
|
|
else:
|
|
# 正常递归搜索文件
|
|
for root, dirs, files in os.walk(search_path):
|
|
# 检查当前目录是否在允许范围内
|
|
if not is_path_allowed(root):
|
|
continue # 跳过不在允许范围内的目录
|
|
|
|
for file in files:
|
|
if search_pattern.lower() in file.lower():
|
|
file_path = os.path.join(root, file)
|
|
|
|
# 再次检查文件路径是否允许
|
|
if not is_path_allowed(file_path):
|
|
continue # 跳过不在允许范围内的文件
|
|
|
|
try:
|
|
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
|
|
relative_path = os.path.relpath(file_path, search_path)
|
|
|
|
results.append({
|
|
'name': file,
|
|
'path': file_path,
|
|
'directory': root,
|
|
'relative_path': relative_path,
|
|
'size': file_size
|
|
})
|
|
|
|
search_count += 1
|
|
if search_count >= max_results:
|
|
break
|
|
except (OSError, PermissionError):
|
|
# 跳过无法访问的文件
|
|
continue
|
|
|
|
if search_count >= max_results:
|
|
break
|
|
|
|
# 按文件名排序
|
|
results.sort(key=lambda x: x['name'].lower())
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'results': results,
|
|
'count': len(results),
|
|
'search_path': search_path,
|
|
'base_dir': ALLOWED_BASE_DIR,
|
|
'is_all_files': show_all_files # 标识是否是显示所有文件
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': f'搜索失败: {str(e)}'})
|
|
|
|
|
|
# 浏览文件路由 - 添加目录访问限制和文件路径处理
|
|
@app.route('/browse', methods=['POST'])
|
|
def browse_files():
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'status': 'error', 'message': '无效的JSON数据'})
|
|
|
|
requested_path = data.get('path', ALLOWED_BASE_DIR)
|
|
file_type = data.get('type', 'file')
|
|
|
|
# 检查请求的路径是否是文件而不是目录
|
|
if os.path.isfile(requested_path):
|
|
# 如果是文件,取其所在目录
|
|
current_path = os.path.dirname(requested_path)
|
|
# 确保路径安全
|
|
current_path = get_safe_path(current_path)
|
|
else:
|
|
# 如果是目录,确保路径安全
|
|
current_path = get_safe_path(requested_path)
|
|
|
|
# 确保路径存在
|
|
if not os.path.exists(current_path):
|
|
current_path = ALLOWED_BASE_DIR
|
|
|
|
# 确保最终路径是目录
|
|
if not os.path.isdir(current_path):
|
|
current_path = os.path.dirname(current_path) if os.path.isfile(current_path) else ALLOWED_BASE_DIR
|
|
|
|
items = []
|
|
try:
|
|
# 获取目录内容
|
|
for item in os.listdir(current_path):
|
|
item_path = os.path.join(current_path, item)
|
|
|
|
# 检查路径是否允许访问
|
|
if not is_path_allowed(item_path):
|
|
continue
|
|
|
|
try:
|
|
# 根据请求的类型过滤
|
|
if os.path.isfile(item_path) and file_type == 'file':
|
|
items.append({
|
|
'name': item,
|
|
'path': item_path,
|
|
'type': 'file',
|
|
'size': os.path.getsize(item_path) if os.path.exists(item_path) else 0
|
|
})
|
|
elif os.path.isdir(item_path) and file_type == 'folder':
|
|
items.append({
|
|
'name': item,
|
|
'path': item_path,
|
|
'type': 'folder'
|
|
})
|
|
except (OSError, PermissionError):
|
|
# 跳过无法访问的文件/目录
|
|
continue
|
|
except (OSError, PermissionError):
|
|
return jsonify({'status': 'error', 'message': '无法访问该目录'})
|
|
|
|
# 按类型和名称排序(文件夹在前,文件在后)
|
|
items.sort(key=lambda x: (x['type'] != 'folder', x['name'].lower()))
|
|
|
|
# 获取当前选中项(如果是文件浏览模式且传入的是文件路径)
|
|
selected_item = None
|
|
if os.path.isfile(requested_path) and file_type == 'file' and is_path_allowed(requested_path):
|
|
selected_item = os.path.basename(requested_path)
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'items': items,
|
|
'current_path': current_path,
|
|
'base_dir': ALLOWED_BASE_DIR,
|
|
'selected_item': selected_item # 返回当前选中的文件名(如果有)
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': str(e)})
|
|
|
|
|
|
# 发送连接路由
|
|
@app.route('/send/connect', methods=['POST'])
|
|
def send_connect():
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'status': 'error', 'message': '无效的JSON数据'})
|
|
|
|
key = data.get('key', '').strip()
|
|
|
|
if not key:
|
|
return jsonify({'status': 'error', 'message': '请输入密钥'})
|
|
|
|
if '|' not in key:
|
|
return jsonify({'status': 'error', 'message': '密钥格式应为: token|sk|psk|sern'})
|
|
|
|
parts = key.split('|')
|
|
if len(parts) < 4:
|
|
return jsonify({'status': 'error', 'message': '密钥格式应为: token|sk|psk|sern'})
|
|
|
|
token, sk, psk, sern = [part.strip() for part in parts]
|
|
|
|
# 创建send.ini配置文件
|
|
ini_content = f"""[common]
|
|
local_ip = 0.0.0.0
|
|
server_addr = 47.97.6.201
|
|
server_port = 7100
|
|
token = {token}
|
|
|
|
[xtcp_visitor]
|
|
type = xtcp
|
|
role = visitor
|
|
server_name = {sern}
|
|
sk = {sk}
|
|
bind_ip = 127.0.0.1
|
|
bind_port = 7001
|
|
"""
|
|
|
|
try:
|
|
with open('./send.ini', 'w', encoding='utf-8') as f:
|
|
f.write(ini_content)
|
|
socketio.emit('send_output', {'message': '配置文件 send.ini 创建成功\n'})
|
|
except Exception as e:
|
|
socketio.emit('send_output', {'message': f'创建配置文件失败: {e}\n'})
|
|
return jsonify({'status': 'error', 'message': f'创建配置文件失败: {e}'})
|
|
|
|
# 启动frpc进程
|
|
def run_send_frpc():
|
|
try:
|
|
socketio.emit('send_output', {'message': '[NATC] 正在启动NATC客户端...\n'})
|
|
socketio.emit('send_status', {'status': '启动中...'})
|
|
|
|
processes['send_frpc'] = subprocess.Popen(
|
|
['/usr/bin/frpc/frpc', '-c', './send.ini'],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
encoding='utf-8',
|
|
errors='replace',
|
|
bufsize=1
|
|
)
|
|
|
|
# 监控输出
|
|
for line in iter(processes['send_frpc'].stdout.readline, ''):
|
|
if not is_running['send']:
|
|
break
|
|
if line:
|
|
socketio.emit('send_output', {'message': f'[NATC] {line}'})
|
|
|
|
# 进程结束后的处理
|
|
return_code = processes['send_frpc'].wait()
|
|
if is_running['send']:
|
|
socketio.emit('send_output', {'message': f'[NATC] NATC客户端已停止,退出码: {return_code}\n'})
|
|
socketio.emit('send_status', {'status': '已停止'})
|
|
is_running['send'] = False
|
|
|
|
except Exception as e:
|
|
socketio.emit('send_output', {'message': f'[NATC] 执行错误: {e}\n'})
|
|
socketio.emit('send_status', {'status': '错误'})
|
|
is_running['send'] = False
|
|
|
|
is_running['send'] = True
|
|
thread = threading.Thread(target=run_send_frpc)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
# 等待一段时间让frpc建立连接
|
|
time.sleep(3)
|
|
|
|
return jsonify({'status': 'success', 'message': '连接已启动'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': f'连接失败: {str(e)}'})
|
|
|
|
# 发送文件路由
|
|
@app.route('/send/file', methods=['POST'])
|
|
def send_file():
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'status': 'error', 'message': '无效的JSON数据'})
|
|
|
|
if not is_running['send']:
|
|
return jsonify({'status': 'error', 'message': '请先启动连接'})
|
|
|
|
file_path = data.get('file_path', '').strip()
|
|
psk = data.get('psk', '').strip()
|
|
|
|
if not file_path or not os.path.exists(file_path):
|
|
return jsonify({'status': 'error', 'message': '文件路径无效或文件不存在'})
|
|
|
|
if not psk:
|
|
return jsonify({'status': 'error', 'message': 'PSK不能为空'})
|
|
|
|
# 启动文件发送进程
|
|
def run_send_file():
|
|
try:
|
|
socketio.emit('send_output', {'message': f'[P2P] 开始发送文件: {os.path.basename(file_path)}\n'})
|
|
socketio.emit('send_status', {'status': '正在发送文件...'})
|
|
|
|
processes['send_p2p'] = subprocess.Popen(
|
|
[
|
|
'python3',
|
|
'../p2pfile.py',
|
|
'--mode', 'send',
|
|
'--path', file_path,
|
|
'--psk', psk
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
encoding='utf-8',
|
|
errors='replace',
|
|
bufsize=1
|
|
)
|
|
|
|
# 监控输出
|
|
for line in iter(processes['send_p2p'].stdout.readline, ''):
|
|
if not is_running['send']:
|
|
break
|
|
if line.strip():
|
|
socketio.emit('send_output', {'message': f'[P2P] {line}'})
|
|
|
|
# 进程结束后的处理
|
|
return_code = processes['send_p2p'].wait()
|
|
if is_running['send']:
|
|
socketio.emit('send_output', {'message': f'[P2P] 文件发送完成,退出码: {return_code}\n'})
|
|
socketio.emit('send_status', {'status': '发送完成'})
|
|
|
|
except Exception as e:
|
|
socketio.emit('send_output', {'message': f'[P2P] 执行错误: {e}\n'})
|
|
socketio.emit('send_status', {'status': '发送错误'})
|
|
|
|
thread = threading.Thread(target=run_send_file)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return jsonify({'status': 'success', 'message': '文件发送已启动'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': str(e)})
|
|
|
|
# 停止发送路由
|
|
@app.route('/send/stop', methods=['POST'])
|
|
def send_stop():
|
|
try:
|
|
if is_running['send']:
|
|
is_running['send'] = False
|
|
|
|
# 停止frpc进程
|
|
if processes['send_frpc'] and processes['send_frpc'].poll() is None:
|
|
try:
|
|
processes['send_frpc'].terminate()
|
|
socketio.emit('send_output', {'message': '[NATC] 正在停止NATC客户端...\n'})
|
|
except:
|
|
pass
|
|
|
|
# 停止P2P进程
|
|
if processes['send_p2p'] and processes['send_p2p'].poll() is None:
|
|
try:
|
|
processes['send_p2p'].terminate()
|
|
socketio.emit('send_output', {'message': '[P2P] 正在停止文件发送...\n'})
|
|
except:
|
|
pass
|
|
|
|
time.sleep(1)
|
|
socketio.emit('send_status', {'status': '已停止'})
|
|
socketio.emit('send_output', {'message': '所有进程已停止\n'})
|
|
|
|
return jsonify({'status': 'success', 'message': '已停止发送进程'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': str(e)})
|
|
|
|
# 接收连接路由
|
|
@app.route('/receive/connect', methods=['POST'])
|
|
def receive_connect():
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'status': 'error', 'message': '无效的JSON数据'})
|
|
|
|
if is_running['receive']:
|
|
return jsonify({'status': 'error', 'message': '接收进程已在运行'})
|
|
|
|
key = data.get('key', '').strip()
|
|
save_path = "/mnt/wwn-0x5000c500a34676a5-part1/docker/kodbox/data/files/下载/" #data.get('save_path', './received_files').strip()
|
|
|
|
if not key:
|
|
return jsonify({'status': 'error', 'message': '请输入密钥'})
|
|
|
|
if '|' not in key:
|
|
return jsonify({'status': 'error', 'message': '密钥格式应为: token|sk|psk|sern'})
|
|
|
|
parts = key.split('|')
|
|
if len(parts) < 4:
|
|
return jsonify({'status': 'error', 'message': '密钥格式应为: token|sk|psk|sern'})
|
|
|
|
token, sk, psk, sern = [part.strip() for part in parts]
|
|
|
|
# 创建service.ini配置文件
|
|
ini_content = f"""[common]
|
|
server_addr = 47.97.6.201
|
|
server_port = 7100
|
|
token = {token}
|
|
|
|
[{sern}]
|
|
type = xtcp
|
|
sk = {sk}
|
|
local_ip = 127.0.0.1
|
|
local_port = 6000
|
|
"""
|
|
|
|
try:
|
|
with open('./service.ini', 'w', encoding='utf-8') as f:
|
|
f.write(ini_content)
|
|
socketio.emit('receive_output', {'message': '配置文件 service.ini 创建成功\n'})
|
|
except Exception as e:
|
|
socketio.emit('receive_output', {'message': f'创建配置文件失败: {e}\n'})
|
|
return jsonify({'status': 'error', 'message': f'创建配置文件失败: {e}'})
|
|
|
|
# 启动frpc进程
|
|
def run_receive_frpc():
|
|
try:
|
|
socketio.emit('receive_output', {'message': '[NATC] 正在启动NATC客户端...\n'})
|
|
socketio.emit('receive_status', {'status': '启动中...'})
|
|
|
|
processes['receive_frpc'] = subprocess.Popen(
|
|
['/usr/bin/frpc/frpc', '-c', './service.ini'],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
encoding='utf-8',
|
|
errors='replace',
|
|
bufsize=1
|
|
)
|
|
|
|
# 监控输出
|
|
for line in iter(processes['receive_frpc'].stdout.readline, ''):
|
|
if not is_running['receive']:
|
|
break
|
|
if line:
|
|
socketio.emit('receive_output', {'message': f'[NATC] {line}'})
|
|
|
|
# 进程结束后的处理
|
|
return_code = processes['receive_frpc'].wait()
|
|
if is_running['receive']:
|
|
socketio.emit('receive_output', {'message': f'[NATC] NATC客户端已停止,退出码: {return_code}\n'})
|
|
socketio.emit('receive_status', {'status': '已停止'})
|
|
is_running['receive'] = False
|
|
|
|
except Exception as e:
|
|
socketio.emit('receive_output', {'message': f'[NATC] 执行错误: {e}\n'})
|
|
socketio.emit('receive_status', {'status': '错误'})
|
|
is_running['receive'] = False
|
|
|
|
is_running['receive'] = True
|
|
thread = threading.Thread(target=run_receive_frpc)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
# 等待一段时间让frpc建立连接,然后启动P2P接收
|
|
time.sleep(3)
|
|
|
|
# 启动P2P接收进程
|
|
def run_receive_p2p():
|
|
try:
|
|
# 创建目录(如果不存在)
|
|
os.makedirs(save_path, exist_ok=True)
|
|
|
|
socketio.emit('receive_output', {'message': '[P2P] 启动P2P文件接收器...\n'})
|
|
socketio.emit('receive_output', {'message': f'[P2P] 使用PSK: {psk}\n'})
|
|
socketio.emit('receive_output', {'message': f'[P2P] 文件保存路径: {save_path}\n'})
|
|
socketio.emit('receive_status', {'status': '正在接收文件...'})
|
|
|
|
processes['receive_p2p'] = subprocess.Popen(
|
|
[
|
|
'python3',
|
|
'../p2pfile.py',
|
|
'--mode', 'recv',
|
|
'--psk', psk,
|
|
'--outdir', save_path
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
encoding='utf-8',
|
|
errors='replace',
|
|
bufsize=1
|
|
)
|
|
|
|
# 监控输出
|
|
for line in iter(processes['receive_p2p'].stdout.readline, ''):
|
|
if not is_running['receive']:
|
|
break
|
|
if line.strip():
|
|
socketio.emit('receive_output', {'message': f'[P2P] {line}'})
|
|
|
|
# 进程结束后的处理
|
|
return_code = processes['receive_p2p'].wait()
|
|
if is_running['receive']:
|
|
socketio.emit('receive_output', {'message': f'[P2P] P2P接收器已停止,退出码: {return_code}\n'})
|
|
socketio.emit('receive_status', {'status': '接收完成'})
|
|
|
|
except Exception as e:
|
|
socketio.emit('receive_output', {'message': f'[P2P] 执行错误: {e}\n'})
|
|
socketio.emit('receive_status', {'status': '接收错误'})
|
|
|
|
p2p_thread = threading.Thread(target=run_receive_p2p)
|
|
p2p_thread.daemon = True
|
|
p2p_thread.start()
|
|
|
|
return jsonify({'status': 'success', 'message': '接收已启动'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': str(e)})
|
|
|
|
# 停止接收路由
|
|
@app.route('/receive/stop', methods=['POST'])
|
|
def receive_stop():
|
|
try:
|
|
if is_running['receive']:
|
|
is_running['receive'] = False
|
|
|
|
# 停止frpc进程
|
|
if processes['receive_frpc'] and processes['receive_frpc'].poll() is None:
|
|
try:
|
|
processes['receive_frpc'].terminate()
|
|
socketio.emit('receive_output', {'message': '[NATC] 正在停止NATC客户端...\n'})
|
|
except:
|
|
pass
|
|
|
|
# 停止P2P进程
|
|
if processes['receive_p2p'] and processes['receive_p2p'].poll() is None:
|
|
try:
|
|
processes['receive_p2p'].terminate()
|
|
socketio.emit('receive_output', {'message': '[P2P] 正在停止文件接收...\n'})
|
|
except:
|
|
pass
|
|
|
|
time.sleep(1)
|
|
socketio.emit('receive_status', {'status': '已停止'})
|
|
socketio.emit('receive_output', {'message': '所有进程已停止\n'})
|
|
|
|
return jsonify({'status': 'success', 'message': '已停止接收进程'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': str(e)})
|
|
|
|
# 检查文件信息路由
|
|
@app.route('/check/file', methods=['POST'])
|
|
def check_file():
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'status': 'error', 'message': '无效的JSON数据'})
|
|
|
|
file_path = data.get('file_path', '').strip()
|
|
|
|
if not file_path:
|
|
return jsonify({'status': 'error', 'message': '文件路径不能为空'})
|
|
|
|
if not os.path.exists(file_path):
|
|
return jsonify({'status': 'error', 'message': '文件不存在'})
|
|
|
|
if not os.path.isfile(file_path):
|
|
return jsonify({'status': 'error', 'message': '路径不是文件'})
|
|
|
|
# 获取文件信息
|
|
file_stats = os.stat(file_path)
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'size': file_stats.st_size,
|
|
'mtime': file_stats.st_mtime,
|
|
'ctime': file_stats.st_ctime,
|
|
'is_file': True
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'status': 'error', 'message': f'检查文件失败: {str(e)}'})
|
|
if __name__ == '__main__':
|
|
print("启动蜗牛文件传输Web服务...")
|
|
print(f"允许访问的目录: {ALLOWED_BASE_DIR}")
|
|
print("访问地址: http://0.0.0.0:5054")
|
|
socketio.run(app, debug=True, host='0.0.0.0', port=5054)
|