Sources, and in particular, extended sources, are Malloy's primary unit of reusability for defining computations, join relationships, and queries.
Malloy separates queries into a data source and a view on that data. A source can be thought of as a table and a collection extensions that are relevant to that table, including measures (aggregate functions), dimensions (scalar calculations), view definitions, and join relationships to other sources.
A source can be any of the following:
Source type | Example |
---|---|
A SQL table or SQL view | duckdb.table('data/flights.parquet') |
A Malloy query | flights -> { group_by: carrier } |
A SQL query | duckdb.sql("""select 1 as one""") |
A source can be used directly in a query:
run: duckdb.table('../data/flights.parquet') -> { aggregate: flight_count is count() }
[ { "flight_count": 344827 } ]
SELECT COUNT(1) as "flight_count" FROM '../data/flights.parquet' as base
Or, more commonly, it can be declared with a name so that it can be reused:
source: flights is duckdb.table('../data/flights.parquet') run: flights -> { aggregate: flight_count is count() }
[ { "flight_count": 344827 } ]
SELECT COUNT(1) as "flight_count" FROM '../data/flights.parquet' as base
Any of these kinds of sources can be extended to add reusable definitions or other modifications.
source: flights2 is duckdb.table('../data/flights.parquet') extend { measure: flight_count is count() }
Sources from Tables or Views
A source can be created from a SQL table or SQL view from a connected database.
When defining a source in this way, all the columns from the source table are available for use in field definitions or views against the source.
run: flights3 -> { // Columns from the source table are available group_by: carrier origin aggregate: flight_count is count() limit: 3 }
[ { "carrier": "AA", "origin": "DFW", "flight_count": 8742 }, { "carrier": "NW", "origin": "MSP", "flight_count": 8662 }, { "carrier": "DL", "origin": "ATL", "flight_count": 8419 } ]
SELECT base."carrier" as "carrier", base."origin" as "origin", COUNT(1) as "flight_count" FROM '../data/flights.parquet' as base GROUP BY 1,2 ORDER BY 3 desc NULLS LAST LIMIT 3
Sources from Malloy Queries
In Malloy, every query has an associated output schema, so it can be used as a source for other queries.
For example, in this model we define a query flights_by_carrier
:
query: flights_by_carrier is duckdb.table('../data/flights.parquet') -> { group_by: carrier aggregate: lifetime_flights is count() }
And here, we use the query flights_by_carrier
as a source:
run: flights_by_carrier -> { select: carrier lifetime_flights_bucketed is round(lifetime_flights, -4) lifetime_flights limit: 3 }
[ { "carrier": "DL", "lifetime_flights_bucketed": 30000, "lifetime_flights": 32130 }, { "carrier": "WN", "lifetime_flights_bucketed": 90000, "lifetime_flights": 88751 }, { "carrier": "AA", "lifetime_flights_bucketed": 30000, "lifetime_flights": 34577 } ]
WITH __stage0 AS ( SELECT base."carrier" as "carrier", COUNT(1) as "lifetime_flights" FROM '../data/flights.parquet' as base GROUP BY 1 ) SELECT base."carrier" as "carrier", ROUND(base."lifetime_flights",-4) as "lifetime_flights_bucketed", base."lifetime_flights" as "lifetime_flights" FROM __stage0 as base LIMIT 3
We can also explicitly define the query as a source, which is useful when adding reusable computations:
source: carrier_facts is flights_by_carrier extend { dimension: lifetime_flights_bucketed is round(lifetime_flights, -4) } run: carrier_facts -> { select: carrier lifetime_flights_bucketed lifetime_flights limit: 3 }
[ { "carrier": "RU", "lifetime_flights_bucketed": 20000, "lifetime_flights": 16074 }, { "carrier": "OH", "lifetime_flights_bucketed": 0, "lifetime_flights": 4420 }, { "carrier": "CO", "lifetime_flights_bucketed": 10000, "lifetime_flights": 7139 } ]
WITH __stage0 AS ( SELECT base."carrier" as "carrier", COUNT(1) as "lifetime_flights" FROM '../data/flights.parquet' as base GROUP BY 1 ) SELECT base."carrier" as "carrier", ROUND(base."lifetime_flights",-4) as "lifetime_flights_bucketed", base."lifetime_flights" as "lifetime_flights" FROM __stage0 as base LIMIT 3
Here we referenced the query name flights_by_carrier
, but we can also define a source by writing a query inline and then extend
ing it:
source: carrier_facts2 is duckdb.table('../data/flights.parquet') -> { group_by: carrier aggregate: lifetime_flights is count() } extend { dimension: lifetime_flights_bucketed is round(lifetime_flights, -4) } run: carrier_facts2 -> { select: carrier, lifetime_flights_bucketed, lifetime_flights limit: 3 }
[ { "carrier": "RU", "lifetime_flights_bucketed": 20000, "lifetime_flights": 16074 }, { "carrier": "OH", "lifetime_flights_bucketed": 0, "lifetime_flights": 4420 }, { "carrier": "CO", "lifetime_flights_bucketed": 10000, "lifetime_flights": 7139 } ]
WITH __stage0 AS ( SELECT base."carrier" as "carrier", COUNT(1) as "lifetime_flights" FROM '../data/flights.parquet' as base GROUP BY 1 ) SELECT base."carrier" as "carrier", ROUND(base."lifetime_flights",-4) as "lifetime_flights_bucketed", base."lifetime_flights" as "lifetime_flights" FROM __stage0 as base LIMIT 3
Sources from SQL Queries
Sources can be created from a SQL query, e.g.
source: limited_users is duckdb.sql(""" SELECT first_name, last_name, gender FROM '../data/users.parquet' LIMIT 100 """) run: limited_users -> { group_by: first_name aggregate: user_count is count() }
[ { "first_name": "JAMES", "user_count": 4 }, { "first_name": "WILLIAM", "user_count": 4 }, { "first_name": "RANDY", "user_count": 4 }, { "first_name": "ROBERT", "user_count": 3 }, { "first_name": "DANIEL", "user_count": 3 } ]
SELECT base."first_name" as "first_name", COUNT(1) as "user_count" FROM ( SELECT first_name, last_name, gender FROM '../data/users.parquet' LIMIT 100 ) as base GROUP BY 1 ORDER BY 2 desc NULLS LAST
Like with duckdb.table('data/users.parquet')
, Malloy fetches the schema from the database to make columns of the resulting table accessible in computations.
Source Extensions
Any source can be extended to add filters, specify a primary key, add fields and joins, rename fields, or limit which fields are available.
Extensions are often added when defining a source for the first time:
source: flights5 is duckdb.table('../data/flights.parquet') extend { measure: flight_count is count() }
An existing source can also be extended, then given a new name or used in a query.
source: flights6 is duckdb.table('../data/flights.parquet') source: flights_ext is flights6 extend { measure: flight_count is count() } run: flights6 extend { measure: total_distance is sum(distance) # percent measure: percent_distance is total_distance / all(total_distance) } -> { group_by: carrier aggregate: total_distance, percent_distance }
[ { "carrier": "WN", "total_distance": 54619152, "percent_distance": 0.21390989275965064 }, { "carrier": "UA", "total_distance": 38882934, "percent_distance": 0.15228072823467806 }, { "carrier": "AA", "total_distance": 37684885, "percent_distance": 0.14758870128576448 }, { "carrier": "NW", "total_distance": 33376503, "percent_distance": 0.13071539773122362 }, { "carrier": "US", "total_distance": 23721642, "percent_distance": 0.09290319806325123 } ]
WITH __stage0 AS ( SELECT group_set, CASE WHEN group_set=1 THEN base."carrier" END as "carrier__1", CASE WHEN group_set=1 THEN COALESCE(SUM(base."distance"),0) END as "total_distance__1", (CASE WHEN group_set=1 THEN COALESCE(SUM(base."distance"),0) END)*1.0/MAX((CASE WHEN group_set=0 THEN COALESCE(SUM(base."distance"),0) END)) OVER () as "percent_distance__1" FROM '../data/flights.parquet' as base CROSS JOIN (SELECT UNNEST(GENERATE_SERIES(0,1,1)) as group_set ) as group_set GROUP BY 1,2 ) SELECT "carrier__1" as "carrier", MAX(CASE WHEN group_set=1 THEN "total_distance__1" END) as "total_distance", MAX(CASE WHEN group_set=1 THEN "percent_distance__1" END) as "percent_distance" FROM __stage0 WHERE group_set NOT IN (0) GROUP BY 1 ORDER BY 2 desc NULLS LAST
The following subsections document the various kinds of source extensions.
Adding Fields
Fields—dimensions, measures, and views—may be defined as part of a source extension, allowing for them to be used in any query against the source, or in other fields within that source.
source: airports2 is duckdb.table('../data/airports.parquet') extend { dimension: has_control_tower is cntl_twr = 'Y' measure: airport_count is count() average_elevation is avg(elevation) view: average_elevation_by_control_tower is { group_by: has_control_tower aggregate: average_elevation } } run: airports2 -> { group_by: state aggregate: airport_count nest: average_elevation_by_control_tower limit: 2 }
[ { "state": "TX", "airport_count": 1845, "average_elevation_by_control_tower": [ { "has_control_tower": false, "average_elevation": 899.2608453837597 }, { "has_control_tower": true, "average_elevation": 861.2340425531914 } ] }, { "state": "CA", "airport_count": 984, "average_elevation_by_control_tower": [ { "has_control_tower": false, "average_elevation": 1130.6059275521404 }, { "has_control_tower": true, "average_elevation": 552.3561643835617 } ] } ]
WITH __stage0 AS ( SELECT group_set, base."state" as "state__0", CASE WHEN group_set=0 THEN COUNT(1) END as "airport_count__0", CASE WHEN group_set=1 THEN base."cntl_twr"='Y' END as "has_control_tower__1", CASE WHEN group_set=1 THEN AVG(base."elevation") END as "average_elevation__1" FROM '../data/airports.parquet' as base CROSS JOIN (SELECT UNNEST(GENERATE_SERIES(0,1,1)) as group_set ) as group_set GROUP BY 1,2,4 ) SELECT "state__0" as "state", MAX(CASE WHEN group_set=0 THEN "airport_count__0" END) as "airport_count", COALESCE(LIST({ "has_control_tower": "has_control_tower__1", "average_elevation": "average_elevation__1"} ORDER BY "average_elevation__1" desc NULLS LAST) FILTER (WHERE group_set=1),[]) as "average_elevation_by_control_tower" FROM __stage0 GROUP BY 1 ORDER BY 2 desc NULLS LAST LIMIT 2
For more information about fields and how to define them, see the Fields section, or for information on views specifically, see the Views section.
Filtering Sources
Filters can be added as a source extension with a where:
clause. These filters apply to any query against the source.
source: flights7 is duckdb.table('../data/flights.parquet') source: long_sfo_flights is flights7 extend { where: origin = 'SFO' and distance > 1000 } run: long_sfo_flights -> { group_by: destination aggregate: flight_count is count() limit: 3 }
[ { "destination": "ORD", "flight_count": 362 }, { "destination": "DFW", "flight_count": 304 }, { "destination": "MSP", "flight_count": 281 } ]
SELECT base."destination" as "destination", COUNT(1) as "flight_count" FROM '../data/flights.parquet' as base WHERE (base."origin"='SFO') and (base."distance">1000) GROUP BY 1 ORDER BY 2 desc NULLS LAST LIMIT 3
Primary Keys
To be used in with
-style joins to other sources, a source must
have a primary key specified as an extension.
Joins
When sources are joined as part of their definition, queries can reference fields in the joined sources without having to specify the join relationship each time.
source: carriers2 is duckdb.table('../data/carriers.parquet') extend { primary_key: code } source: flights9 is duckdb.table('../data/flights.parquet') extend { join_one: carriers with carrier measure: flight_count is count() } run: flights9 -> { group_by: carriers.nickname aggregate: flight_count limit: 3 }
[ { "nickname": "Southwest", "flight_count": 88751 }, { "nickname": "USAir", "flight_count": 37683 }, { "nickname": "American", "flight_count": 34577 } ]
SELECT carriers_0."nickname" as "nickname", COUNT(1) as "flight_count" FROM '../data/flights.parquet' as base LEFT JOIN '../data/carriers.parquet' AS carriers_0 ON carriers_0."code"=base."carrier" GROUP BY 1 ORDER BY 2 desc NULLS LAST LIMIT 3
See the Joins section for more information on working with joins.
Renaming Fields
Fields from a source may be renamed in the context of the new source. This is useful when the original name is not descriptive, or has a different meaning in the new context.
source: airports5 is duckdb.table('../data/airports.parquet') extend { rename: facility_type is fac_type }
Limiting Access to Fields
The list of fields available in a source can be limited. This can be done either by accept
ing a list of fields to include (in which case any other field from the source is excluded, i.e. an "allow list") or by except
ing a list of fields to exclude (any other field is included, i.e. a "deny list"). These cannot be used in conjunction with one another.
In this example, we define airports
to extend
the airports.parquet
table, but we limit the included columns to only id
, code
, city
, state
, and elevation
.
source: airports6 is duckdb.table('../data/airports.parquet') extend { accept: id, code, city, state, elevation }
Here, we do the same, but instead of specifying which columns to include, we specify to include all columns except c_ldg_rts
, aero_cht
, and cntl_twr
.