文章内容上次编辑时间于 1 month ago。文章内容已经有一段时间没有更新了,也许不再适用!
文章共 2141 字,阅读完预计需要 3 分钟 35 秒。文章篇幅适中,可以放心阅读。
PyLiteFlux
PyLiteFlux 是一个轻量级的Python Web/TCP服务器框架,完全基于Python标准库实现,不依赖任何第三方包。它支持HTTP和TCP协议。适合用于学习和理解Python后端框架的概念和实现原理。 代码地址
特性
- 支持HTTP和TCP服务器
- 支持TCP长连接和消息推送
- 支持后台任务处理
- 内置中间件系统
- 简单的路由注册
- 支持配置管理
- 支持请求上下文
安装
pip install pyliteflux
快速开始
基本使用
from pyliteflux import Server, Route
server = Server()
@Route.register("/hello", method="GET")
def hello(request):
return {"message": "Hello, World!"}
server.run()
TCP长连接和消息推送示例
下面是一个简单的聊天系统示例,展示了如何使用TCP长连接和消息推送功能:
from pyliteflux import Server, Route
import time
server = Server()
server.config.tcp_keep_alive = True # 启用TCP长连接
# HTTP接口:发送消息给指定客户端
@Route.register("/hello", method="GET")
def hello(request):
# 从查询参数中获取目标client_id
target_client = request.query_params.get("client_id")
def heavy_task():
time.sleep(5) # 模拟耗时操作
server.push_message("Hello via HTTP GET!", target_client)
request.bg.add(heavy_task)
return {
"message": "Hello via HTTP GET!",
"target_client": target_client,
"query_params": request.query_params
}
# TCP接口:列出所有活跃连接
@Route.register("clients", protocol="tcp")
def list_clients(request):
clients = list(server._active_connections.keys())
return {
"message": "Active TCP clients",
"clients": clients,
"count": len(clients)
}
# TCP接口:订阅消息
@Route.register("subscribe", protocol="tcp")
def subscribe_tcp(request):
client_id = f"{request.client_address[0]}:{request.client_address[1]}"
return {"message": "subscribed", "client_id": client_id}
server.run()
主要功能
HTTP服务器
- 支持GET、POST、PUT、DELETE方法
- 支持路由参数
- 支持查询参数
- 支持JSON和表单数据
TCP服务器
- 支持长连接
- 支持消息推送
- 支持JSON和XML数据格式
- 支持自定义命令
后台任务
- 支持异步任务处理
- 任务队列管理
- 线程池执行
中间件系统
- 请求处理中间件
- 响应处理中间件
- 上下文管理中间件
中间件和全局上下文示例
中间件使用
中间件可以处理请求和响应,实现认证、日志、性能监控等功能:
from pyliteflux import Server, Route, Middleware
# 定义认证中间件
class AuthMiddleware(Middleware):
def process_request(self, request):
# 检查认证token
token = request.headers.get("Authorization")
if not token:
return {"error": "Unauthorized"}
# 将认证信息存储在g对象中
g.user = self.validate_token(token)
return None # 继续处理请求
def process_response(self, request, response):
# 处理响应
if isinstance(response, dict):
response["authenticated"] = hasattr(g, "user")
return response
def validate_token(self, token):
# 实际项目中这里应该验证token
return {"id": 1, "name": "user1"}
# 定义日志中间件
class LogMiddleware(Middleware):
def process_request(self, request):
# 记录请求开始时间
g.start_time = time.time()
return None
def process_response(self, request, response):
# 计算请求处理时间
duration = time.time() - g.start_time
Logger.info(f"Request processed in {duration:.2f}s")
return response
# 使用中间件
server = Server()
server.middleware_manager.add_middleware(AuthMiddleware())
server.middleware_manager.add_middleware(LogMiddleware())
# 在路由中使用g对象
@Route.register("/user", method="GET")
def get_user(request):
if hasattr(g, "user"):
return {
"message": "User info",
"user": g.user,
"request_id": g.request_id # 每个请求的唯一ID
}
return {"error": "User not found"}
server.run()
全局上下文使用
g对象是请求级别的全局对象,可以在整个请求过程中共享数据:
from pyliteflux import Server, Route, g
server = Server()
# 在中间件中设置数据
class DataMiddleware(Middleware):
def process_request(self, request):
g.db = Database() # 假设的数据库连接
g.cache = Cache() # 假设的缓存连接
return None
def process_response(self, request, response):
# 清理资源
if hasattr(g, "db"):
g.db.close()
if hasattr(g, "cache"):
g.cache.close()
return response
# 在路由中使用g对象
@Route.register("/data", method="GET")
def get_data(request):
# 使用g对象中的资源
data = g.db.query("SELECT * FROM users")
cached_data = g.cache.get("users")
return {
"data": data,
"cached": cached_data,
"request_id": g.request_id
}
# 在后台任务中使用g对象
@Route.register("/async", method="GET")
def async_task(request):
def background_job():
# 注意:后台任务有自己的g对象实例
g.task_id = uuid.uuid4()
Logger.info(f"Background task {g.task_id} started")
request.bg.add(background_job)
return {"message": "Task scheduled"}
server.run()
实际应用场景
- 用户认证和授权:
class AuthMiddleware(Middleware):
def process_request(self, request):
token = request.headers.get("Authorization")
if token:
g.user = self.authenticate(token)
g.permissions = self.get_permissions(g.user)
return None
@Route.register("/admin", method="GET")
def admin_panel(request):
if not hasattr(g, "permissions") or "admin" not in g.permissions:
return {"error": "Access denied"}, 403
return {"message": "Welcome, admin!"}
- 性能监控:
class PerformanceMiddleware(Middleware):
def process_request(self, request):
g.start_time = time.time()
g.sql_queries = []
return None
def process_response(self, request, response):
duration = time.time() - g.start_time
metrics = {
"duration": duration,
"sql_queries": len(g.sql_queries),
"memory_usage": self.get_memory_usage()
}
Logger.info(f"Performance metrics: {metrics}")
return response
- 请求跟踪:
class TracingMiddleware(Middleware):
def process_request(self, request):
g.trace_id = str(uuid.uuid4())
g.span_id = str(uuid.uuid4())
g.traces = []
return None
def process_response(self, request, response):
if isinstance(response, dict):
response["trace_id"] = g.trace_id
response["traces"] = g.traces
return response
@Route.register("/trace", method="GET")
def traced_endpoint(request):
g.traces.append({"event": "processing", "time": time.time()})
result = process_data()
g.traces.append({"event": "completed", "time": time.time()})
return {"data": result}
这些示例展示了中间件和g对象的强大功能:
- 请求前后的处理
- 资源的自动管理
- 请求级别的数据共享
- 性能监控和日志记录
- 认证和授权
- 请求跟踪和调试
中间件和g对象的组合使用可以大大提高开发效率和代码质量。
配置选项
server = Server()
server.config.update(
http_host="127.0.0.1",
http_port=8000,
tcp_host="127.0.0.1",
tcp_port=8001,
tcp_keep_alive=True, # 启用TCP长连接
info=True # 启用日志
)
API文档
路由装饰器
@Route.register(path, method="GET", protocol="http")
- path: 路由路径
- method: HTTP方法(GET/POST/PUT/DELETE)
- protocol: 协议类型(http/tcp)
服务器方法
server.run() # 启动服务器
server.stop() # 停止服务器
server.push_message(message, client_id=None) # 推送消息
请求对象
request.query_params # 查询参数
request.form_data # 表单数据
request.json_data # JSON数据
request.headers # 请求头
request.bg.add(task) # 添加后台任务
响应格式
PyLiteFlux支持多种响应格式,包括普通JSON响应、流式响应和自定义JSON响应:
1. 普通JSON响应
@Route.register("/hello", method="GET")
def hello(request):
return {"message": "Hello, World!"} # 自动转换为JSON响应
2. 流式响应
流式响应适用于大数据传输或需要实时推送数据的场景:
from pyliteflux import StreamResponse
@Route.register("/stream", method="GET")
def stream_data(request):
def generate_data():
for i in range(10):
time.sleep(1) # 模拟数据生成
yield f"data: {i}\n\n"
return StreamResponse(
generate_data(),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
# 带参数的流式响应
@Route.register("/download", method="GET")
def download_file(request):
def file_reader():
with open("large_file.txt", "rb") as f:
while chunk := f.read(8192):
yield chunk
return StreamResponse(
file_reader(),
content_type="application/octet-stream",
headers={
"Content-Disposition": "attachment; filename=large_file.txt"
}
)
3. HTTPJSONResponse响应
当需要更细粒度地控制JSON响应时,可以使用HTTPJSONResponse:
from pyliteflux import HTTPJSONResponse
@Route.register("/api/user", method="GET")
def get_user(request):
user_data = {"id": 1, "name": "John"}
return HTTPJSONResponse(
data=user_data,
status=200,
headers={
"X-Custom-Header": "value",
"Access-Control-Allow-Origin": "*"
}
)
# 错误响应示例
@Route.register("/api/error", method="GET")
def error_response(request):
return HTTPJSONResponse(
data={"error": "Not Found", "code": "404"},
status=404,
headers={"X-Error-Type": "NotFound"}
)
# 自定义序列化示例
@Route.register("/api/custom", method="GET")
def custom_serialization(request):
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
data = {
"timestamp": datetime.now(),
"message": "Custom serialization"
}
return HTTPJSONResponse(
data=data,
json_encoder=CustomEncoder,
headers={"Content-Type": "application/json"}
)
4. 组合使用示例
@Route.register("/api/complex", method="GET")
def complex_response(request):
response_type = request.query_params.get("type", "json")
if response_type == "stream":
def stream():
for i in range(5):
yield json.dumps({"count": i}) + "\n"
return StreamResponse(stream(), content_type="application/x-ndjson")
elif response_type == "error":
return HTTPJSONResponse(
{"error": "Bad Request"},
status=400,
headers={"X-Error-Details": "Invalid type"}
)
else:
return {"message": "Regular JSON response"} # 自动转换为JSON
参数使用说明
服务器配置参数
服务器实例支持多种配置参数,可以通过config对象进行设置:
from pyliteflux import Server
server = Server()
# 方式1:通过属性设置
server.config.http_host = "127.0.0.1"
server.config.http_port = 8000
server.config.tcp_keep_alive = True
# 方式2:通过update方法批量设置
server.config.update(
http_host="127.0.0.1",
http_port=8000,
tcp_host="127.0.0.1",
tcp_port=8001,
tcp_keep_alive=True,
info=True
)
配置参数说明:
参数 | 类型 | 默认值 | 说明 |
---|---|---|---|
http_host | str | "127.0.0.1" | HTTP服务器主机地址 |
http_port | int | 8000 | HTTP服务器端口 |
tcp_host | str | "127.0.0.1" | TCP服务器主机地址 |
tcp_port | int | 8001 | TCP服务器端口 |
tcp_keep_alive | bool | False | 是否启用TCP长连接 |
info | bool | True | 是否启用日志输出 |
路由参数
路由装饰器支持多种参数配置:
from pyliteflux import Route
# HTTP路由
@Route.register("/hello", method="GET") # 基本GET请求
@Route.register("/user", method="POST") # POST请求
@Route.register("/user/{id}", method="GET") # 带路径参数的路由
@Route.register("/files/*", method="GET") # 通配符路由
# TCP路由
@Route.register("command", protocol="tcp") # TCP命令路由
路由参数说明:
参数 | 类型 | 默认值 | 说明 |
---|---|---|---|
path | str | 必填 | 路由路径 |
method | str | "GET" | HTTP方法(GET/POST/PUT/DELETE) |
protocol | str | "http" | 协议类型(http/tcp) |
请求参数
HTTP请求可以通过多种方式传递参数:
# 1. 查询参数
@Route.register("/search", method="GET")
def search(request):
keyword = request.query_params.get("keyword")
page = request.query_params.get("page", "1")
return {"keyword": keyword, "page": page}
# 2. 路径参数
@Route.register("/user/{user_id}/posts/{post_id}", method="GET")
def get_post(request):
user_id = request.path_params["user_id"]
post_id = request.path_params["post_id"]
return {"user_id": user_id, "post_id": post_id}
# 3. 表单数据
@Route.register("/upload", method="POST")
def upload(request):
file_name = request.form_data.get("file_name")
content = request.form_data.get("content")
return {"file_name": file_name}
# 4. JSON数据
@Route.register("/api/data", method="POST")
def handle_json(request):
data = request.json_data
return {"received": data}
TCP连接参数
TCP连接相关的参数和使用方式:
# 1. 启用TCP长连接
server = Server()
server.config.tcp_keep_alive = True
# 2. 发送消息给特定客户端
@Route.register("/push", method="GET")
def push_message(request):
client_id = request.query_params.get("client_id")
message = request.query_params.get("message", "Hello!")
server.push_message(message, client_id)
return {"status": "sent"}
# 3. 广播消息给所有客户端
@Route.register("/broadcast", method="GET")
def broadcast(request):
message = request.query_params.get("message", "Broadcast!")
server.push_message(message) # 不指定client_id则广播
return {"status": "broadcasted"}
中间件参数
中间件可以通过process_request和process_response方法处理请求和响应:
class CustomMiddleware(Middleware):
def __init__(self, **options):
self.options = options
def process_request(self, request):
# 处理请求
request.custom_attr = self.options.get("custom_attr")
return None
def process_response(self, request, response):
# 处理响应
if isinstance(response, dict):
response["processed_by"] = self.options.get("name", "CustomMiddleware")
return response
# 使用中间件
server = Server()
server.middleware_manager.add_middleware(
CustomMiddleware(
name="MyMiddleware",
custom_attr="custom_value"
)
)
后台任务参数
后台任务支持参数传递:
@Route.register("/task", method="GET")
def async_task(request):
def background_job(user_id, task_type="default"):
time.sleep(5)
Logger.info(f"Processing {task_type} task for user {user_id}")
# 添加带参数的后台任务
user_id = request.query_params.get("user_id", "0")
request.bg.add(
background_job,
user_id, # 位置参数
task_type="important" # 关键字参数
)
return {"message": "Task scheduled"}
这些参数的使用示例展示了框架的灵活性和功能性。你可以根据需要组合使用这些参数来实现各种功能。所有参数都有合理的默认值,使框架既易于使用又足够灵活。
需要注意的是:
- 参数名称区分大小写
- 必填参数没有默认值时必须提供
- 某些参数组合可能会相互影响
- 建议遵循Python的命名规范
注意事项
- TCP长连接需要显式启用:
server.config.tcp_keep_alive = True
- 后台任务会在独立的线程池中
- 推送消息时如果不指定client_id,将广播给所有连接的客户端
许可证
MIT License