告別 Hello World!實作檔案上傳 (Streaming) 與 身份驗證 (Interceptor)
各位好!歡迎回到 gRPC 實戰系列的第二篇。
在上一篇《用 Python 搭建你的第一個 gRPC 服務》中,我們成功跑通了最基礎的 Hello World。但你肯定會心想:「現實世界的開發哪有這麼簡單?」
沒錯,在真實的微服務架構中,我們經常會遇到兩個棘手問題:
大資料傳輸:如果要傳送一個 500MB 的影片檔,傳統 REST 可能會逾時,而如果一次性把檔案讀進記憶體再傳,伺服器肯定會 OOM (Out of Memory)。
安全性與中介軟體:我們不希望在每個 API 函數裡都手寫 if check_token(): 這樣的檢查邏輯,這樣既難維護又容易漏掉。
今天我們就來一一擊破,實作 gRPC 的 客戶端串流 (Client Streaming) 與 攔截器 (Interceptor)。
核心概念
-
什麼是 Streaming? gRPC 支援串流傳輸,這意味著我們不需要等待所有資料都準備好才發送。以「檔案上傳」為例,我們可以把一個大檔案切成無數個 64KB 的小區塊 (Chunk),像流水一樣源源不絕地送到 Server。Server 也是邊收邊處理,記憶體永遠只佔用那一小塊的大小。
-
什麼是 Interceptor? 如果你寫過 Web 後端,它就是 Middleware (中介軟體)。它像是一個守門員,擋在客戶端請求和你的業務邏輯之間。我們可以在這裡統一處理 Log 紀錄、身份驗證 (Auth)、或是 錯誤處理。
實作開始
第一步:修改合約 (helloworld.proto) 我們修改一下之前的 .proto 檔,新增一個 UploadImage 方法,並使用 stream 關鍵字。
// helloworld.proto
syntax = "proto3";
package helloworld;
// 定義服務
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
// 注意:輸入參數前加了 'stream',表示這是一個連續的資料流
rpc UploadImage (stream ImageChunk) returns (UploadStatus) {}
}
// 一般請求
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
message ImageChunk {
bytes data = 1;
}
// 上傳狀態 (串流結束後回傳)
message UploadStatus {
string message = 1;
int32 size = 2;
}
記得執行生成程式的指令:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto
第二步:實作 Server 與 攔截器 (server.py)
這裡我們要做兩件事: 寫一個 AuthInterceptor 類別,檢查 Metadata 中的 Token。 在 Greeter 類別中,用 for 迴圈接收串流資料。
import logging
from concurrent import futures
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
class AuthInterceptor(grpc.ServerInterceptor):
def __init__(self, secret_key):
self._secret_key = secret_key
def intercept_service(self, continuation, handler_call_details):
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get('authorization')
if token != self._secret_key:
def abort(ignored_request, context):
context.abort(grpc.StatusCode.UNAUTHENTICATED, '無效的 Token,請滾蛋!')
return grpc.unary_unary_rpc_method_handler(abort)
return continuation(handler_call_details)
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message=f'Hello, {request.name}! 驗證通過!')
def UploadImage(self, request_iterator, context):
print("--- 開始接收檔案串流 ---")
total_size = 0
data_buffer = bytearray()
# Client 每送一塊資料,這裡就會跑一次迴圈
for chunk in request_iterator:
size = len(chunk.data)
total_size += size
data_buffer.extend(chunk.data)#在真實環境下,應該邊收邊寫入
print(f"收到資料塊: {size} bytes | 目前總計: {total_size} bytes")
print("--- 檔案接收完畢 ---")
with open("server_received_image.dat", "wb") as f:
f.write(data_buffer)
# 回傳最終結果
return helloworld_pb2.UploadStatus(
message="上傳成功",
size=total_size
)
# --- 3. 啟動伺服器 ---
def serve():
# 設定攔截器 (假設密鑰是 "super-secret")
interceptors = [AuthInterceptor('super-secret')]
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=interceptors
)
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
print("Server 啟動中 (Port 50051)...")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
第三步:實作 Client 與 生成器 (client.py) 在 Client 端,我們使用 Python 的 yield 語法來建立生成器,實現「邊讀邊傳」。同時,在發起請求時,我們要帶上 metadata 通過驗證。
import logging
import os
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
# 產生一個假的測試檔案
def create_dummy_file(filename, size_in_mb=5):
if not os.path.exists(filename):
print(f"正在建立測試檔案 {filename} ({size_in_mb}MB)...")
with open(filename, "wb") as f:
f.write(os.urandom(1024 * 1024 * size_in_mb))
# --- 檔案切片生成器 (Generator) ---
# 這就是 Streaming 的關鍵!
def generate_chunks(filename, chunk_size=1024 * 64): # 每次讀 64KB
with open(filename, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield helloworld_pb2.ImageChunk(data=chunk)
def run():
# 準備 5MB 的測試檔案
test_filename = "large_test_image.dat"
create_dummy_file(test_filename)
# 建立連線
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
# 定義 Metadata (通關密語)
valid_metadata = (('authorization', 'super-secret'),)
invalid_metadata = (('authorization', 'wrong-password'),)
print("\n=== 測試 1: 身份驗證 (正確 Token) ===")
try:
response = stub.SayHello(
helloworld_pb2.HelloRequest(name='Python大神'),
metadata=valid_metadata # <--- 記得帶上這張票
)
print(f"Server 回應: {response.message}")
except grpc.RpcError as e:
print(f"錯誤: {e.details()}")
print("\n=== 測試 2: 身份驗證 (錯誤 Token) ===")
try:
response = stub.SayHello(
helloworld_pb2.HelloRequest(name='駭客'),
metadata=invalid_metadata
)
print(f"Server 回應: {response.message}")
except grpc.RpcError as e:
print(f"被攔截成功! 錯誤代碼: {e.code()} | 訊息: {e.details()}")
print("\n=== 測試 3: 大檔案串流上傳 ===")
try:
# 注意:這裡我們把 generator 函數直接丟進去
# gRPC 核心庫會自動迭代這個生成器,將資料一塊塊送出
response = stub.UploadImage(
generate_chunks(test_filename),
metadata=valid_metadata
)
print(f"上傳結果: {response.message} | Server 收到大小: {response.size} bytes")
except grpc.RpcError as e:
print(f"上傳失敗: {e.details()}")
if __name__ == '__main__':
logging.basicConfig()
run()
執行結果 開啟終端機運行 Server:
python server.py
開啟另一個終端機運行 Client:
python client.py
你應該會看到 Client 端顯示驗證成功與失敗的測試,並且看到 5MB 的檔案被切成許多小區塊,順利上傳到 Server,而不會瞬間佔用大量記憶體。
總結
透過今天的實戰,我們學會了 gRPC 兩個非常強大的功能:
Streaming:用生成器 (Generator) 模式優雅地傳輸大檔案,解決頻寬與記憶體瓶頸。
Interceptor:用 AOP (剖面導向程式設計) 的思維來統一處理驗證,讓業務邏輯保持乾淨。
掌握了這些,你已經具備了用 gRPC 開發正式生產環境服務的基礎能力了。 下一篇預告:
寫好了程式,在自己的電腦上跑雖然很爽,但要怎麼部署到雲端呢?下一篇我們將進入維運環節:「如何將 Python gRPC 服務 Docker 化,並解決容器內常見的編譯問題」。
0 留言
發表留言