告別 Hello World!實作檔案上傳 (Streaming) 與 身份驗證 (Interceptor)

2025-12-23 12:22 | By justin | python gRPC
(Updated: 2025-12-23 12:25)

告別 Hello World!實作檔案上傳 (Streaming) 與 身份驗證 (Interceptor)

各位好!歡迎回到 gRPC 實戰系列的第二篇。

在上一篇《用 Python 搭建你的第一個 gRPC 服務》中,我們成功跑通了最基礎的 Hello World。但你肯定會心想:「現實世界的開發哪有這麼簡單?」

沒錯,在真實的微服務架構中,我們經常會遇到兩個棘手問題:

大資料傳輸:如果要傳送一個 500MB 的影片檔,傳統 REST 可能會逾時,而如果一次性把檔案讀進記憶體再傳,伺服器肯定會 OOM (Out of Memory)。

安全性與中介軟體:我們不希望在每個 API 函數裡都手寫 if check_token(): 這樣的檢查邏輯,這樣既難維護又容易漏掉。

今天我們就來一一擊破,實作 gRPC 的 客戶端串流 (Client Streaming) 與 攔截器 (Interceptor)。

核心概念

  1. 什麼是 Streaming? gRPC 支援串流傳輸,這意味著我們不需要等待所有資料都準備好才發送。以「檔案上傳」為例,我們可以把一個大檔案切成無數個 64KB 的小區塊 (Chunk),像流水一樣源源不絕地送到 Server。Server 也是邊收邊處理,記憶體永遠只佔用那一小塊的大小。

  2. 什麼是 Interceptor? 如果你寫過 Web 後端,它就是 Middleware (中介軟體)。它像是一個守門員,擋在客戶端請求和你的業務邏輯之間。我們可以在這裡統一處理 Log 紀錄、身份驗證 (Auth)、或是 錯誤處理。

實作開始

第一步:修改合約 (helloworld.proto) 我們修改一下之前的 .proto 檔,新增一個 UploadImage 方法,並使用 stream 關鍵字。

// helloworld.proto
syntax = "proto3";

package helloworld;

// 定義服務
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  // 注意:輸入參數前加了 'stream',表示這是一個連續的資料流
  rpc UploadImage (stream ImageChunk) returns (UploadStatus) {}
}

// 一般請求
message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

message ImageChunk {
  bytes data = 1;
}

// 上傳狀態 (串流結束後回傳)
message UploadStatus {
  string message = 1;
  int32 size = 2;
}

記得執行生成程式的指令:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto

第二步:實作 Server 與 攔截器 (server.py)

這裡我們要做兩件事: 寫一個 AuthInterceptor 類別,檢查 Metadata 中的 Token。 在 Greeter 類別中,用 for 迴圈接收串流資料。

import logging
from concurrent import futures
import grpc
import helloworld_pb2
import helloworld_pb2_grpc

class AuthInterceptor(grpc.ServerInterceptor):
    def __init__(self, secret_key):
        self._secret_key = secret_key

    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get('authorization')

        if token != self._secret_key:
            def abort(ignored_request, context):
                context.abort(grpc.StatusCode.UNAUTHENTICATED, '無效的 Token,請滾蛋!')

            return grpc.unary_unary_rpc_method_handler(abort)

        return continuation(handler_call_details)

class Greeter(helloworld_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message=f'Hello, {request.name}! 驗證通過!')

    def UploadImage(self, request_iterator, context):
        print("--- 開始接收檔案串流 ---")
        total_size = 0
        data_buffer = bytearray()
        # Client 每送一塊資料,這裡就會跑一次迴圈
        for chunk in request_iterator:
            size = len(chunk.data)
            total_size += size
            data_buffer.extend(chunk.data)#在真實環境下,應該邊收邊寫入
            print(f"收到資料塊: {size} bytes | 目前總計: {total_size} bytes")

        print("--- 檔案接收完畢 ---")

        with open("server_received_image.dat", "wb") as f:
            f.write(data_buffer)

        # 回傳最終結果
        return helloworld_pb2.UploadStatus(
            message="上傳成功",
            size=total_size
        )

# --- 3. 啟動伺服器 ---
def serve():
    # 設定攔截器 (假設密鑰是 "super-secret")
    interceptors = [AuthInterceptor('super-secret')]

    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=interceptors
    )

    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')

    print("Server 啟動中 (Port 50051)...")
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()

第三步:實作 Client 與 生成器 (client.py) 在 Client 端,我們使用 Python 的 yield 語法來建立生成器,實現「邊讀邊傳」。同時,在發起請求時,我們要帶上 metadata 通過驗證。

import logging
import os
import grpc
import helloworld_pb2
import helloworld_pb2_grpc

# 產生一個假的測試檔案
def create_dummy_file(filename, size_in_mb=5):
    if not os.path.exists(filename):
        print(f"正在建立測試檔案 {filename} ({size_in_mb}MB)...")
        with open(filename, "wb") as f:
            f.write(os.urandom(1024 * 1024 * size_in_mb))

# --- 檔案切片生成器 (Generator) ---
# 這就是 Streaming 的關鍵!
def generate_chunks(filename, chunk_size=1024 * 64): # 每次讀 64KB
    with open(filename, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield helloworld_pb2.ImageChunk(data=chunk)

def run():
    # 準備 5MB 的測試檔案
    test_filename = "large_test_image.dat"
    create_dummy_file(test_filename)

    # 建立連線
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)

        # 定義 Metadata (通關密語)
        valid_metadata = (('authorization', 'super-secret'),)
        invalid_metadata = (('authorization', 'wrong-password'),)

        print("\n=== 測試 1: 身份驗證 (正確 Token) ===")
        try:
            response = stub.SayHello(
                helloworld_pb2.HelloRequest(name='Python大神'),
                metadata=valid_metadata # <--- 記得帶上這張票
            )
            print(f"Server 回應: {response.message}")
        except grpc.RpcError as e:
            print(f"錯誤: {e.details()}")

        print("\n=== 測試 2: 身份驗證 (錯誤 Token) ===")
        try:
            response = stub.SayHello(
                helloworld_pb2.HelloRequest(name='駭客'),
                metadata=invalid_metadata
            )
            print(f"Server 回應: {response.message}")
        except grpc.RpcError as e:
            print(f"被攔截成功! 錯誤代碼: {e.code()} | 訊息: {e.details()}")

        print("\n=== 測試 3: 大檔案串流上傳 ===")
        try:
            # 注意:這裡我們把 generator 函數直接丟進去
            # gRPC 核心庫會自動迭代這個生成器,將資料一塊塊送出
            response = stub.UploadImage(
                generate_chunks(test_filename), 
                metadata=valid_metadata
            )
            print(f"上傳結果: {response.message} | Server 收到大小: {response.size} bytes")
        except grpc.RpcError as e:
             print(f"上傳失敗: {e.details()}")

if __name__ == '__main__':
    logging.basicConfig()
    run()

執行結果 開啟終端機運行 Server:

python server.py

開啟另一個終端機運行 Client:

python client.py

你應該會看到 Client 端顯示驗證成功與失敗的測試,並且看到 5MB 的檔案被切成許多小區塊,順利上傳到 Server,而不會瞬間佔用大量記憶體。

總結

透過今天的實戰,我們學會了 gRPC 兩個非常強大的功能:

Streaming:用生成器 (Generator) 模式優雅地傳輸大檔案,解決頻寬與記憶體瓶頸。

Interceptor:用 AOP (剖面導向程式設計) 的思維來統一處理驗證,讓業務邏輯保持乾淨。

掌握了這些,你已經具備了用 gRPC 開發正式生產環境服務的基礎能力了。 下一篇預告:

寫好了程式,在自己的電腦上跑雖然很爽,但要怎麼部署到雲端呢?下一篇我們將進入維運環節:「如何將 Python gRPC 服務 Docker 化,並解決容器內常見的編譯問題」。


0 留言

目前沒有留言

發表留言
回覆