こんばんは。七色メガネです。
今日は、Spanner で 20000ミューテーション以上の一括更新などを実行する方法を紹介します。
Spanner ってなに?
Spanner は、GCP で提供される分散型のRDBです。次の記事で Spanner についての紹介を行なっていますので、よろしければご参照ください。
Spanner における一括更新の制限について
Spanner における制限事項
Spanner にはその使用に当たっていくつかの制限事項があります。例えば次のような事項です。
- 1ノードあたりのサイズは 2T まで
- JOIN による結合は 15回 まで
- Commit サイズは 100M まで
- Commit あたりのミューテーションは 20000 まで
今回のテーマである一括更新を行おうとするときに考慮しなければいけないのが、[1 Commit では 20000 ミューテーション以下の操作しか行えない]、という制限です。端的に言えば、一度の更新で10万行や100万行の更新は基本的に行えない、ということです。
[参考]
https://cloud.google.com/spanner/quotas?hl=ja
ミューテーションとは
ミューテーションとは、オペレーションの影響を受ける列数を表現する単位のことです。1つの update 文で 1 レコードの全列 が更新されるとして、そのレコードが 10 列から構成されるものであれば、この時のミューテーションは 10 になります。
したがって 20000 ミューテーションの制限とは、「1度のオペレーションで影響を受ける列数は常に 20000 以下でなくてはいけない」ということを意味します。
10列を持つテーブルの全列更新であれば一度に 2000 レコードまで、10000列を持つテーブルの全列更新であれば一度の 2 レコードまでの操作しか許容されません。
なお、列数がそのままミューテーションとして数えらえれるのは、挿入と更新のオペレーションについてのみです。削除については列数はミューテーションとして数えられず、削除したい行数がそのままミューテーションとなります。
パーティションDMLについて
Spanner ではミューテーション上限に引っかかるような DML の発行について、パーティションDML という機能がサポートされています。これは大量データの一括更新などのために設計されており、通常の 20000 ミューテーションの上限に制約されることなくオペレーションを実行することができます。
パーティションDML の特徴は、主に次の通りです。
- GCPコンソールからは実行できない
- 1つのオペレーションには1つのDMLステートメントのみ含めることができる。(サブクエリなどは使用できない)
- Insert はサポートされていない
- クエリプラン(SQL実行計画) についての機能はサポートされていない
つまり、Spanner を外部プログラムから操作することができる場合における一括更新について、このパーティションDML を用いることで問題を解決することができます。
[参考]
https://cloud.google.com/spanner/docs/dml-partitioned?hl=ja
データ準備とミューテーション上限に引っかかる挙動の確認
では実際に、パーティションDMLを用いて一括更新を実行してみましょう。
まずはテストデータの作成から始めます。
テストデータの準備
あらかじめ、テスト用のテーブルを Spanner 上に作成しておきます。
今回はミューテションの計測をしやすくするため、特に意味のない列を 10 個用意したテーブルを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE TABLE tranz ( id1 STRING(MAX) NOT NULL, id10 STRING(MAX) NOT NULL, id2 STRING(MAX) NOT NULL, id3 STRING(MAX) NOT NULL, id4 STRING(MAX) NOT NULL, id5 STRING(MAX) NOT NULL, id6 STRING(MAX) NOT NULL, id7 STRING(MAX) NOT NULL, id8 STRING(MAX) NOT NULL, id9 STRING(MAX) NOT NULL, ) PRIMARY KEY (id1) |
次にこのテストテーブルにデータを用意します。
今回は update を実行するときにミューテーション上限に引っかかる挙動も確認したいので、テストデータはそれなりの数を用意したいと思います。
テストテーブルの列数が 10 ですから、全ての列を更新すると仮定したときに必要なレコード数は、 10 * 2000 = 20000 で、最低 2000 レコードです。
とりあえず、10000 レコードほど用意したいと思います。
1 2 3 4 5 6 7 |
select count(*) from tranz; ---------------------------------- クエリ完了(24.26ms経過) 未設定 10000 ---------------------------------- |
はい、tranz テーブルに 10000 件のデータが用意できました。これで一括更新のテストを行う準備ができました。
ミューテーション上限に引っかけてみる
ではパーティションDMLを実行して見る前に、ミューテーションの上限に引っ掛けてみましょう。
現在、10 列のテーブルに 10000 レコードが存在します。ミューテーションの計算は オペレーションが行われる列数 * レコード 数ですから、6000レコードを更新すると仮定したら、3列までは通常のDMLで行えますが、4列では上限に引っかかります。
ただ、今回はサブクエリで対象のレコードを limit して update をかけています。ミューテーションはサブクエリで取得されるレコードについても計算対象となりますから、今回のミューテーション計算は、「サブクエリで取得されるレコード数 + アップデートされる列数 * レコード数」で行われます。
結論として、次のSQLで更新できるのは、5000レコードまでです(select 5000 + update 3列 * 5000 = 20000)。5001レコードからはミューテーション上限にかかります(select 5001 + update 3列 * 5001 = 20004)。
文章では分かりにくいですね。実際にやってみましょう。まずは 5000 レコードの更新です。操作はGCPコンソールから行います。
1 2 3 4 5 6 7 8 |
update tranz set id2 = 'test', id3 = 'test', id4 = 'test' where id1 in ( select id1 from tranz limit 5000 ) |
1 2 3 4 |
5000 行が 更新 されました このステートメントで 5000 行が 更新 され、どの行も返されませんでした クエリ完了(127.49ms |
成功しました。では 5001 レコードではどうでしょうか。
1 2 3 4 5 6 7 8 |
update tranz set id2 = 'test', id3 = 'test', id4 = 'test' where id1 in ( select id1 from tranz limit 5001 ) |
1 2 3 4 5 |
この DML ステートメントでは、単一トランザクションのミューテーションの上限(20,000)を超えています。 ミューテーションの数を減らすには、書き込み数が少ないトランザクションを試すか、使用するインデックスを減らしてください。 このようにすると、オペレーションのミューテーションの数と、ミューテーションの影響を受ける列の数が等しくなるため、問題解決に役立ちます。 書き込みまたはインデックスを減らすと、影響を受ける列の数が減るため、ミューテーションの数が上限以下になります。 または、クライアント ライブラリか gcloud コマンドライン ツールを使用するパーテーション化 DML ステートメントをお試しください。 |
想定通り、エラーになりました。
パーティションDMLの実行(Python)
では、ミューテーション上限を回避するパーティションDMLを実行してみましょう。
前述の通り、パーティションDMLはコンソール上からは実行できません。いくつか実行の方法はありますが、今回は外部プログラム(Python)から専用のライブラリを使用して実行してみたいと思います。
Spanner にアクセスするためのサービスアカウントの用意
まずはSpannerに外部アクセスするためのサービスアカウントを用意します。
- GCPコンソールの [IAMと管理] > [サービスアカウント] を選択します。
- [サービスアカウントの作成] を選択し、任意の名称を付与します。
- [役割の選択] を選択し、[Project – オーナー] 権限を付与します。
- [キーの作成] を選択し、発行される JSONキー を任意の場所に保存します。
テスト環境の画像が公開できなかったので文章だけでごめんなさい。
上記の操作で、Spanner にアクセスするためのサービスアカウントとキーが発行されます。ここで作成されたキーを用いて、Spanner へのアクセスを実現します。
ライブラリの準備
Spanner 関連の操作を行うためのライブラリとして、google-cloud-spanner を使用します。pip でインストールしてください。
https://pypi.org/project/google-cloud-spanner/
実装
では実装していきます。今回は4ファイルを作成します。
- main.py
処理ロジックの呼び出しと結果の表示を行う。 - ctlSpa.py
処理ロジックを実行する。 - conf.py
環境情報などを保持する。 - doMyTest.sh
環境変数を設定し、main.py を実行する。
main.py
特筆事項はないです。ctlSpa の処理を呼び出し、その結果を出力するだけのsrcです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import ctlSpa def main(): results = ctlSpa.executeMassUpdate() display(results) def display(results): printBlank() print("------------------ Start Display SQL Execution Results ------------------ ") if(type(results) is str): print(results) else: for row in results: print(row) print("------------------ Finish Display SQL Execution Results ------------------ ") printBlank() def printBlank(): for i in range(2): print('\n') if __name__ == "__main__": main() |
conf.py
接続する Spanner の インスタンスIDとデータベースIDを保持させます。
1 2 |
INSTANCE_ID = 'インスタンスのID' DATABASE_ID = 'データベースのID' |
ctlSpa.py
ここでやることは2つです。
- データベース情報とインスタンス情報を使用して、Spanner 接続のためのデータベースクラスを生成する。
- 生成されたデータベースクラスの execute_partition_dml メソッドを使用し、DMLを実行する。
ライブラリの使用方法は、下記の src を参照してください。
今回はDMLを直接プログラムの中に書き込んでいます。
ここで、Spanner での update には必ず where 句が必要であることに注意してください。Spanner では where 指定なしでの全件更新処理を許容していないので、全件更新をしたい場合には全件が引っかかるような where 文を指定する必要があります。
また、パーティションDMLとして実行できるのは単一のステートメントであることも忘れないでください。サブクエリを使用した update などは、パーティションDML では実行することができません。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import conf import uuid from google.cloud import spanner def executeMassUpdate(): instance, database = prepare() results = executeSQLMassUpdate(instance, database) return results def prepare(): instance_id = conf.INSTANCE_ID database_id = conf.DATABASE_ID spanner_client = spanner.Client() instance = spanner_client.instance(instance_id) database = instance.database(database_id) return instance, database def executeSQLMassUpdate(instance, database): results = database.execute_partitioned_dml( ''' update tranz set id2 = 'bulk_update', id3 = 'bulk_update', id4 = 'bulk_update', id5 = 'bulk_update', id6 = 'bulk_update', id7 = 'bulk_update', id8 = 'bulk_update', id9 = 'bulk_update', id10 = 'bulk_update' where id1 != 'test' ''' ) resultText = "{} records updated.".format(results) return resultText |
doMyTest.sh
最後に、main.py を実行するためのシェルです。
別にこれは必須ではないのですが、Spanner へのアクセスを行うためには環境変数
1 2 3 4 5 |
#!/bin/bash export GOOGLE_APPLICATION_CREDENTIALS='取得したJSONキーの位置' eval python ./myTestSrc/main.py |
実行
準備が整いました。では、パーティションDMLを実行してみましょう。今回は 9列 * 10000 レコードの更新を行なっているのでミューテーションは 90000 です。
1 2 3 4 5 6 |
$ sh doMyTest.sh ------------------ Start Display SQL Execution Results ------------------ 10000 records updated. ------------------ Finish Display SQL Execution Results ------------------ |
はい、ミューテーション20000 を大きく超えるDMLでしたが、問題なく実行できました。一応レコードの内容を確認します。
1 |
select * from tranz limit 10 |
1 2 3 4 5 6 7 8 9 10 11 |
id1 id10 id2 id3 id4 id5 id6 id7 id8 id9 00032a60-f221-4b79-b320-739fe1654606 bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 0004105b-682b-4543-96c6-09fbdeb9e58e bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 0005e160-f501-4f55-880c-1636e58164d2 bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 001c6b9c-07d6-4f6e-becf-e4bd25b7b30e bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 0024a487-7511-4df6-a8c9-273680c012fc bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 00294e09-179b-403b-920f-8447a3263ffd bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 0031c20b-eaf8-4bca-817c-7ff87586a5f1 bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 003238c8-f9c3-4292-8549-8288115c90cf bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 003342e5-5f11-41a5-ae41-0cbf3fdf1c84 bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update 003735f8-91b4-42c0-ba08-b0c54fe66257 bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update bulk_update |
問題なく処理が行われていることが確認できました。
余談ですが、パーティションDMLを実行した時の実行件数の戻り値は必ずしも正確ではないようです。10000件程度であれば件数はブレませんが、50万件以上の update などになってくると、どうも実際にupdateした件数と戻り値の値が乖離することが発生するようです。
まとめ
・Spanner には、「実行するオペレーションが 20000ミューテーション 以下でなければならない」という制約がある。
・ミューテーションとは、「オペレーションの影響を受ける列数 * レコード数」を意味する。
・20000 ミューテーション以上のDMLを実行したい場合には、パーティションDML を使用する。
・パーティションDML はGCPコンソールからの実行ができない。
・パーティションDML では単一のステートメントしか実行ができない。
以上です。ここまでご覧いただき、ありがとうございました!
余談:20000 ミューテーションを超える Insert の実行
今回はテストデータとして 10000 のレコードを用意しました。では、この 10000 レコードはどうやって用意したのでしょうか。
残念ながら今回紹介した パーティションDML は、Insert に対応していません。したがって、Insert 処理はミューテーション上限に引っかからないように実行するしか、現在は術がありません。
今回は、次のようなロジックで Insert を実行しています。仕組みは単純で、20000ミューテーション以下になるように一度のInsert数を制限しながら処理ループを回しているだけです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
import conf import uuid from google.cloud import spanner def executeMassInsertToTestTranz(): instance, database = prepare() results = executeSQLMassInsertToTranz(instance, database) return results def prepare(): instance_id = conf.INSTANCE_ID database_id = conf.DATABASE_ID spanner_client = spanner.Client() instance = spanner_client.instance(instance_id) database = instance.database(database_id) return instance, database def executeSQLMassInsertToTranz(instance, database): results = None tableName, columns, processList = getInsertInformation(10001) length = len(processList) if length >= 1000: before_index = 0 after_index = 1000 while True: with database.batch() as batch: batch.insert( table = tableName, columns = columns, values = processList[before_index:after_index] ) print("insert running...{0}/{1}".format(after_index, length)) before_index = after_index after_index += 1000 if after_index > length: break else: with database.batch() as batch: batch.insert( table = tableName, columns = columns, values = processList ) results = "Insert is done." return results def getInsertInformation(count): tableName = "tranz" columnName = ("id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8", "id9", "id10",) queryList = [] for i in range(count): query = (str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()), str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4())) queryList.append(query) return tableName, columnName, queryList |
スライス処理が微妙なのは、ご愛嬌…。
実行すると、こんな感じです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
$ sh doMyTest.sh insert running...1000/10001 insert running...2000/10001 insert running...3000/10001 insert running...4000/10001 insert running...5000/10001 insert running...6000/10001 insert running...7000/10001 insert running...8000/10001 insert running...9000/10001 insert running...10000/10001 ------------------ Start Display SQL Execution Results ------------------ Insert is done. ------------------ Finish Display SQL Execution Results ------------------ |
ゴリ押しですね!
参考
https://cloud.google.com/spanner/quotas?hl=ja
https://cloud.google.com/spanner/docs/dml-partitioned?hl=ja
書籍紹介
Google Cloud Platform エンタープライズ設計ガイド
Google Cloud Platform 実践Webアプリ開発 ストーリーで学ぶGoogle App Engine
プログラマのためのGoogle Cloud Platform入門 サービスの全体像からクラウドネイティブアプリケーション構築まで