Malloy provides a Python package that allows you to load Malloy model files, compile Malloy queries, and run Malloy statements on your database from within your Python application or script. The package is open source, MIT-licensed, and hosted in the malloy-py Github repo. It currently works with DuckDB and BigQuery.
Installation
The Malloy package is hosted on PyPI, and can be installed via:
pip install malloy
Usage
Import the malloy
package, and the connector for your database:
import asyncio import malloy from malloy.data.duckdb import DuckDbConnection
Then instantiate a Malloy Runtime
object (in the example below, we use a Python context manager), and pass it a database connection object.
with malloy.Runtime() as runtime: runtime.add_connection(DuckDbConnection(home_dir=home_dir))
The runtime
object has a few key methods:
add_connection(connection: ConnectionInterface)
: used above to connect the runtime to an actual database.load_file(str)
: Takes a filepath pointing to a file containing sources and/or queries, loads the sources and/or queries into the runtime.load_source(str)
: Takes a string containing Malloy code (source definitions and/or queries), loads the sources and/or queries into the runtime.get_sql(query: str, named_query: str)
: Takes either a string containing a Malloy query, or the name of a named query from a Malloy model. Compiles the query and returns the equivalent SQL string.run(query: str, named_query: str)
: Takes either a string containing a Malloy query, or the name of a named query from a Malloy model. Compiles the query, executes it using the database connection, and returns the results. Results are returned in the native format of the database connection (e.g., a DuckDB result object, or a BigQuery result object)
Examples
Run a named query from a Malloy file
import asyncio import malloy from malloy.data.duckdb import DuckDbConnection async def main(): home_dir = "/path/to/samples/duckdb/imdb" with malloy.Runtime() as runtime: runtime.add_connection(DuckDbConnection(home_dir=home_dir)) data = await runtime.load_file(home_dir + "/imdb.malloy").run( named_query="genre_movie_map") dataframe = data.to_dataframe() print(dataframe) if __name__ == "__main__": asyncio.run(main())
Get SQL from an in-line query, using a Malloy file as a source
import asyncio import malloy from malloy.data.duckdb import DuckDbConnection async def main(): home_dir = "/path/to/samples/duckdb/faa" with malloy.Runtime() as runtime: runtime.add_connection(DuckDbConnection(home_dir=home_dir)) [sql, connection ] = await runtime.load_file(home_dir + "/flights.malloy").get_sql(query=""" run: flights -> { where: carrier ? 'WN' | 'DL', dep_time ? @2002-03-03 group_by: flight_date is dep_time.day carrier aggregate: daily_flight_count is flight_count aircraft.aircraft_count nest: per_plane_data is { limit: 20 group_by: tail_num aggregate: plane_flight_count is flight_count nest: flight_legs is { order_by: 2 group_by: tail_num dep_minute is dep_time.minute origin_code dest_code is destination_code dep_delay arr_delay } } } """) print(sql) if __name__ == "__main__": asyncio.run(main())
Write an in-line Malloy model, and run a query
import asyncio import malloy from malloy.data.duckdb import DuckDbConnection async def main(): home_dir = "/path/to/samples/duckdb/imdb/data" with malloy.Runtime() as runtime: runtime.add_connection(DuckDbConnection(home_dir=home_dir)) data = await runtime.load_source(""" source:titles is duckdb.table('titles.parquet') extend { primary_key: tconst dimension: movie_url is concat('https://www.imdb.com/title/',tconst) } """).run(query=""" run: titles -> { group_by: movie_url limit: 5 } """) dataframe = data.to_dataframe() print(dataframe) if __name__ == "__main__": asyncio.run(main())
Querying BigQuary tables
BigQuery auth via OAuth using gcloud.
gcloud auth login --update-adc gcloud config set project {my_project_id} --installation
Actual usage is similar to DuckDB.
import asyncio import malloy from malloy.data.bigquery import BigQueryConnection async def main(): with malloy.Runtime() as runtime: runtime.add_connection(BigQueryConnection()) data = await runtime.load_source(""" source:ga_sessions is bigquery.table('bigquery-public-data.google_analytics_sample.ga_sessions_20170801') extend { measure: hits_count is hits.count() } """).run(query=""" run: ga_sessions -> { where: trafficSource.`source` != '(direct)' group_by: trafficSource.`source` aggregate: hits_count limit: 10 } """) dataframe = data.to_dataframe() print(dataframe) if __name__ == "__main__": asyncio.run(main())