[A-00223] Apache Iceberg入門(Python)

pythonでapache icebergを使ってみます。

まずは仮想環境を作成します。

python3 -m venv .venv

次にLibraryをインストール

pip install pyspark

次にicebergのjarファイルをダウンロードして所定のフォルダに配置します。

https://iceberg.apache.org/releases

次に上記の[spark-warehouse]を作成しておきます。ここにicebergデータが作成されます。

サンプルコードは以下の通りです。

下記はテーブルを作成してデータをインサートするプログラムです。最初に実行してください。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName("IcebergLocalDevelopment") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2') \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.local.type", "hadoop") \
  .config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
  .getOrCreate()
  
spark.sql("SHOW DATABASES").show()

# Create an Iceberg table
spark.sql("""
  CREATE TABLE local.schema.students (
    id INT,
    name STRING,
    age INT,
    subject STRING
  ) USING iceberg""")

# Insert some sample data
spark.sql("""
  INSERT INTO local.schema.students VALUES
    (1, 'Alice', 12, 'mathematics'),
    (2, 'Bob', 11, 'literature'),
    (3, 'Charlie', 9, 'music instruments')""")

# Query the data
result = spark.sql("SELECT * FROM local.schema.students")
result.show()
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-13b1ab6e-3afb-400a-be60-5029399151f7;1.0
        confs: [default]
        found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
:: resolution report :: resolve 136ms :: artifacts dl 3ms
        :: modules in use:
        org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-13b1ab6e-3afb-400a-be60-5029399151f7
        confs: [default]
        0 artifacts copied, 1 already retrieved (0kB/4ms)
24/12/28 03:34:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---------+
|namespace|
+---------+
|  default|
+---------+

+---+-------+---+-----------------+                                             
| id|   name|age|          subject|
+---+-------+---+-----------------+
|  1|  Alice| 12|      mathematics|
|  2|    Bob| 11|       literature|
|  3|Charlie|  9|music instruments|
+---+-------+---+-----------------+

次にデータをアップデートするプログラムを実行します。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName("IcebergLocalDevelopment") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2') \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.local.type", "hadoop") \
  .config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
  .getOrCreate()
  
spark.sql("SHOW DATABASES").show()


spark.sql("""
          UPDATE local.schema.students SET age=8 WHERE id=3
          """)

# Query the data
result = spark.sql("SELECT * FROM local.schema.students ORDER BY id")
result.show()
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-06aefbb8-09dc-47ff-b84d-032296064d7c;1.0
        confs: [default]
        found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
:: resolution report :: resolve 87ms :: artifacts dl 4ms
        :: modules in use:
        org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-06aefbb8-09dc-47ff-b84d-032296064d7c
        confs: [default]
        0 artifacts copied, 1 already retrieved (0kB/4ms)
24/12/28 03:37:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---------+
|namespace|
+---------+
|  default|
+---------+

+---+-------+---+-----------------+                                             
| id|   name|age|          subject|
+---+-------+---+-----------------+
|  1|  Alice| 12|      mathematics|
|  2|    Bob| 11|       literature|
|  3|Charlie|  8|music instruments|
+---+-------+---+-----------------+

・Appendix

参考文献はこちら

https://iceberg.apache.org/releases

https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-update.html

https://karlchris.github.io/data-engineering/projects/spark-iceberg/#setup-iceberg-session

https://medium.com/@tglawless/developing-with-apache-iceberg-pyspark-2727957f173f

https://app.datacamp.com/learn/tutorials/apache-iceberg

https://dtnavi.tcdigital.jp/cat_system/language_015

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

*