[A-00223] Apache Iceberg入門(Python)
pythonでapache icebergを使ってみます。
まずは仮想環境を作成します。
python3 -m venv .venv
次にLibraryをインストール
pip install pyspark
次にicebergのjarファイルをダウンロードして所定のフォルダに配置します。
https://iceberg.apache.org/releases
data:image/s3,"s3://crabby-images/c8f1e/c8f1efb60ebb93eaca417b75efa482b165a315d5" alt=""
次に上記の[spark-warehouse]を作成しておきます。ここにicebergデータが作成されます。
サンプルコードは以下の通りです。
下記はテーブルを作成してデータをインサートするプログラムです。最初に実行してください。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergLocalDevelopment") \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2') \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
.getOrCreate()
spark.sql("SHOW DATABASES").show()
# Create an Iceberg table
spark.sql("""
CREATE TABLE local.schema.students (
id INT,
name STRING,
age INT,
subject STRING
) USING iceberg""")
# Insert some sample data
spark.sql("""
INSERT INTO local.schema.students VALUES
(1, 'Alice', 12, 'mathematics'),
(2, 'Bob', 11, 'literature'),
(3, 'Charlie', 9, 'music instruments')""")
# Query the data
result = spark.sql("SELECT * FROM local.schema.students")
result.show()
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-13b1ab6e-3afb-400a-be60-5029399151f7;1.0
confs: [default]
found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
:: resolution report :: resolve 136ms :: artifacts dl 3ms
:: modules in use:
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-13b1ab6e-3afb-400a-be60-5029399151f7
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/4ms)
24/12/28 03:34:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---------+
|namespace|
+---------+
| default|
+---------+
+---+-------+---+-----------------+
| id| name|age| subject|
+---+-------+---+-----------------+
| 1| Alice| 12| mathematics|
| 2| Bob| 11| literature|
| 3|Charlie| 9|music instruments|
+---+-------+---+-----------------+
次にデータをアップデートするプログラムを実行します。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("IcebergLocalDevelopment") \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2') \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
.getOrCreate()
spark.sql("SHOW DATABASES").show()
spark.sql("""
UPDATE local.schema.students SET age=8 WHERE id=3
""")
# Query the data
result = spark.sql("SELECT * FROM local.schema.students ORDER BY id")
result.show()
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-06aefbb8-09dc-47ff-b84d-032296064d7c;1.0
confs: [default]
found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
:: resolution report :: resolve 87ms :: artifacts dl 4ms
:: modules in use:
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-06aefbb8-09dc-47ff-b84d-032296064d7c
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/4ms)
24/12/28 03:37:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---------+
|namespace|
+---------+
| default|
+---------+
+---+-------+---+-----------------+
| id| name|age| subject|
+---+-------+---+-----------------+
| 1| Alice| 12| mathematics|
| 2| Bob| 11| literature|
| 3|Charlie| 8|music instruments|
+---+-------+---+-----------------+
・Appendix
参考文献はこちら
https://iceberg.apache.org/releases
https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-update.html
https://karlchris.github.io/data-engineering/projects/spark-iceberg/#setup-iceberg-session
https://medium.com/@tglawless/developing-with-apache-iceberg-pyspark-2727957f173f
コメントを残す