Google Cloud Pub/Subのエミュレータで非同期pullを試す手順

Google Cloud Pub/Subのエミュレータで非同期pullを試す手順
Page content

PythonでCloud Pub/Subの非同期pull 実装をエミュレータで試した手順を紹介します。

環境

Google Cloud Shellで試しました。

Google アカウントがあれば無料で利用できます。
https://cloud.google.com/blog/ja/topics/anthos/introducing-the-anthos-developer-sandbox

Pythonなどもインストール済みのため、いろいろとお試しするのに便利です。

Pub/Subエミュレータの使用

大枠は公式の手順通りです。
https://cloud.google.com/pubsub/docs/emulator?hl=ja

  • emulatorのインストール
    こちらは私のGoogle Cloud Shellには既に入っていたので実施していません。
    入っていない場合は下記のコマンドでインストールできるようです。

    gcloud components install pubsub-emulator
    gcloud components update
    
  • エミュレータの開始
    公式のコマンドのままではうまくいかず、「–data-dir」のオプションを指定しています。

    gcloud beta emulators pubsub start --project=batch --data-dir=/tmp
    
  • 環境変数の設定
    こちらはエミュレータの開始に合わせて「–data-dir」のオプションを指定しています。

    $(gcloud beta emulators pubsub env-init --data-dir=/tmp)
    
  • 環境変数の確認

    echo ${PUBSUB_EMULATOR_HOST}
    
  • エミュレータの停止
    https://cloud.google.com/pubsub/docs/emulator?hl=ja#stop
    起動中のエミュレータをCtrl + Cで停止し、下記コマンドを実行します。

    unset PUBSUB_EMULATOR_HOST
    

Google Cloudクライアントライブラリのインストール

  • google-cloud-pubsubのインストール
    シンプルに下記コマンドでインストールしました。
    バージョン2.10.0がインストールされました。
    pip3 install google-cloud-pubsub
    

PubSubのトピックとサブスクリプションの作成

  • サンプルコード
    若干一覧表示など不要なこともしていますが、下記を実行するとトピックとサブスクリプションを作成できます。

    ※こちらはエミュレータを起動する度に実行が必要です。
    from google import pubsub_v1
    
    project_id = 'HOGE' # こちらは実際のproject_idに合わせて指定が必要です
    
    # topicリスト取得
    publisher = pubsub_v1.PublisherClient()
    request = pubsub_v1.ListTopicsRequest(project=publisher.common_project_path(project_id))
    topics = publisher.list_topics(request=request)
    n = 0
    for element in topics:
        print('exist: ', element)
        n += 1
    
    # topic作成
    topic_name = 'test'
    topic_path = publisher.topic_path(project_id, topic_name)
    request = pubsub_v1.Topic(name=topic_path)  
    if n == 0:
        response = publisher.create_topic(request=request)
        print('Topic created: {}'.format(response))
    
    # subscriptionリスト取得
    subscriber = pubsub_v1.SubscriberClient()
    request = pubsub_v1.ListSubscriptionsRequest(project=subscriber.common_project_path(project_id))
    subscriptions = subscriber.list_subscriptions(request=request)
    n = 0
    for element in subscriptions:
        print('exist: ', element)
        n += 1
    
    # subscription作成
    subscription_name = 'test_sub'
    subscription_path = subscriber.subscription_path(project_id, subscription_name)
    request = pubsub_v1.Subscription(
        name=subscription_path,
        topic=topic_path,
    )
    if n == 0:
        response = subscriber.create_subscription(request=request)
        print('Subscription created: {}'.format(response))
    

非同期pull

  • サンプルコード
    公式の非同期pull のPythonの実装をほぼ持ってきて、メッセージのパブリッシュなどの実装を付け足しました。

    一度のpullで取得するメッセージを10件ずつ、2スレッドで動作するようにしているつもりです。
    from google.cloud import pubsub_v1
    from concurrent.futures import TimeoutError
    import time
    from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
    import concurrent.futures.thread
    
    def push(project_id):
        topic_id = "test"
        publisher = pubsub_v1.PublisherClient()
        # The `topic_path` method creates a fully qualified identifier
        # in the form `projects/{project_id}/topics/{topic_id}`
        topic_path = publisher.topic_path(project_id, topic_id)
    
        cnt = 0
        for n in range(1, 5):
            data_str = f"Message number {n}"
            # Data must be a bytestring
            data = data_str.encode("utf-8")
            # When you publish a message, the client returns a future.
            future = publisher.publish(topic_path, data)
            print(future.result())
            cnt += 1
    
        if cnt > 0:
            print(f"Published messages to {topic_path}.")
    
    
    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        print(f"Received {message}.")
        print(f"Received {message.message_id}.")
        message.ack()
    
    
    def pull(project_id):
        subscriber = pubsub_v1.SubscriberClient()
        # in the form `projects/{project_id}/subscriptions/{subscription_id}`
        subscription_id = 'test_sub'
        subscription_path = subscriber.subscription_path(project_id, subscription_id)
    
        # Max messages and threads
        flow_control = pubsub_v1.types.FlowControl(max_messages=10)
        scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(2))
    
        streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control, scheduler=scheduler)
        print(f"Listening for messages on {subscription_path}..\n")
    
        with subscriber:
            try:
                streaming_pull_future.result(timeout=5)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
    
    
    project_id = "batch"
    push(project_id)
    
    n = 0
    while True:
        pull(project_id)
        n += 1
        if n == 3:
            break