Composite sources in Malloy are virtualized sources which are backed by multiple other tables or sources. This is useful for handling cubed / rolled up data.
Syntax
source: composite_source is compose(base_one, base_two, ...) extend { // ... extra definitions }
Simple Example
Suppose our airports
table has billions of rows instead of the thousands it actually has. Querying such a table would be extremely expensive, and most often not very useful. A common strategy for analysis on extremely large datasets is to roll up or "cube" the data. Let's look at how composite sources can help us query this kind of data.
One kind of analysis that might be common with our airports
table is to look at number of airports by state. We'll start with a simplified airports
source with only the airport_code
and state
.
source: airports is duckdb.table('../data/airports.parquet') extend { accept: code, state rename: airport_code is code }
We can roll up this table by performing a query like this:
run: airports -> { group_by: state aggregate: airport_count is count() }
[ { "state": "TX", "airport_count": 1845 }, { "state": "CA", "airport_count": 984 }, { "state": "IL", "airport_count": 890 }, { "state": "FL", "airport_count": 856 }, { "state": "PA", "airport_count": 804 } ]
SELECT base."state" as "state", COUNT(1) as "airport_count" FROM '../data/airports.parquet' as base GROUP BY 1 ORDER BY 2 desc NULLS LAST
Now suppose we store the result in a table called state_facts
. We can then create a source based on that table which will allow us to perform analysis on number of aiports per state much more efficiently.
Note how we rename airport_count
to state_airport_count
, then redefine airport_count
as a sum of state_airport_count
s.
source: state_facts is duckdb.table('../data/state_facts.parquet') extend { accept: state, airport_count rename: state_airport_count is airport_count measure: airport_count is state_airport_count.sum() } run: state_facts -> { group_by: state aggregate: airport_count order_by: airport_count desc }
[ { "state": "TX", "airport_count": 1845 }, { "state": "CA", "airport_count": 984 }, { "state": "IL", "airport_count": 890 }, { "state": "FL", "airport_count": 856 }, { "state": "PA", "airport_count": 804 } ]
SELECT base."state" as "state", COALESCE(SUM(base."airport_count"),0) as "airport_count" FROM '../data/state_facts.parquet' as base GROUP BY 1 ORDER BY 2 desc NULLS LAST
This is great, but if we have many different levels of rollup which we would like to query consistently, having to choose which base source to use is error-prone and tedious. This is where composite sources help us. We can define a composite source airports_composite
, which is backed by both airports
and state_facts
. Depending on the query, Malloy will automatically select which to use.
Now if we run a query that only uses state
and airport_count
, we will use state_facts
, but if we use airport_code
, we will fall back to the full airports
table. You can view the SQL tab to see that each query is backed by the appropriate table.
run: airports_composite -> { group_by: state; aggregate: airport_count}
[ { "state": "TX", "airport_count": 1845 }, { "state": "CA", "airport_count": 984 }, { "state": "IL", "airport_count": 890 }, { "state": "FL", "airport_count": 856 }, { "state": "PA", "airport_count": 804 } ]
SELECT base."state" as "state", COALESCE(SUM(base."airport_count"),0) as "airport_count" FROM '../data/state_facts.parquet' as base GROUP BY 1 ORDER BY 2 desc NULLS LAST
run: airports_composite -> { select: airport_code, state }
[ { "airport_code": "1Q9", "state": null }, { "airport_code": "Q51", "state": null }, { "airport_code": "3N1", "state": null }, { "airport_code": "03N", "state": null }, { "airport_code": "ANG", "state": null } ]
SELECT base."code" as "airport_code", base."state" as "state" FROM '../data/airports.parquet' as base
Composite Sources Based on Partitions
Another common pattern is to define multiple levels of roll up in the same table with different partitions. We can generate a table with multiple levels of rollup by using GROUPING SETS
and GROUPING_ID
.
SELECT code, state, county, fac_type, CASE GROUPING_ID(code, state, county, fac_type) WHEN 0 THEN 'code.state.county.fac_type' WHEN 8 THEN 'state.county.fac_type' WHEN 9 THEN 'state.county' WHEN 10 THEN 'state.fac_type' WHEN 12 THEN 'county.fac_type' WHEN 11 THEN 'state' WHEN 13 THEN 'county' WHEN 14 THEN 'fac_type' ELSE '' END AS 'rollup', COUNT() AS 'num_facilities', COUNT(CASE WHEN fac_type='HELIPORT' THEN 1 END) AS 'num_heliports', COUNT(CASE WHEN fac_type='AIRPORT' THEN 1 END) AS 'num_airports', COUNT(CASE WHEN cntl_twr='Y' THEN 1 END) AS 'num_control_towers' FROM '../data/airports.parquet' GROUP BY GROUPING SETS ( (code, state, county, fac_type), (state, county, fac_type), (state, county), (state, fac_type), (county, fac_type), (state), (county), (fac_type), () )
source: facilities_with_rollups is duckdb.table('../data/facilities_with_rollups.parquet') run: facilities_with_rollups -> { select: state, num_airports order_by: num_airports desc where: rollup = 'state' }
[ { "state": "TX", "num_airports": 1389 }, { "state": "IL", "num_airports": 625 }, { "state": "CA", "num_airports": 569 }, { "state": "OH", "num_airports": 537 }, { "state": "FL", "num_airports": 511 } ]
SELECT base."state" as "state", base."num_airports" as "num_airports" FROM '../data/facilities_with_rollups.parquet' as base WHERE base."rollup"='state' ORDER BY 2 desc NULLS LAST
run: facilities_with_rollups -> { select: state, county, num_airports order_by: num_airports desc where: rollup = 'state.county' }
[ { "state": "CA", "county": "SAN BERNARDINO", "num_airports": 47 }, { "state": "TX", "county": "DENTON", "num_airports": 47 }, { "state": "AZ", "county": "MARICOPA", "num_airports": 46 }, { "state": "CA", "county": "KERN", "num_airports": 41 }, { "state": "AK", "county": "MATA-SUS BOROUGH", "num_airports": 41 } ]
SELECT base."state" as "state", base."county" as "county", base."num_airports" as "num_airports" FROM '../data/facilities_with_rollups.parquet' as base WHERE base."rollup"='state.county' ORDER BY 3 desc NULLS LAST
Given this table, we can define our backing sources.
source: `facilities:code.state.county.fac_type` is facilities_with_rollups extend { where: rollup = 'code.state.county.fac_type' } source: `facilities:state.county.fac_type` is facilities_with_rollups extend { accept: rollup, state, county, fac_type, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = 'state.county.fac_type' } source: `facilities:state.county` is facilities_with_rollups extend { accept: rollup, state, county, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = 'state.county' } source: `facilities:state.fac_type` is facilities_with_rollups extend { accept: rollup, state, fac_type, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = 'state.fac_type' } source: `facilities:county.fac_type` is facilities_with_rollups extend { accept: rollup, county, fac_type, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = 'county.fac_type' } source: `facilities:state` is facilities_with_rollups extend { accept: rollup, state, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = 'state' } source: `facilities:county` is facilities_with_rollups extend { accept: rollup, county, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = 'county' } source: `facilities:fac_type` is facilities_with_rollups extend { accept: rollup, fac_type, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = 'fac_type' } source: `facilities:` is facilities_with_rollups extend { accept: rollup, num_facilities, num_airports, num_heliports, num_control_towers where: rollup = '' } source: facilities is compose( `facilities:`, `facilities:fac_type`, `facilities:county`, `facilities:state`, `facilities:county.fac_type`, `facilities:state.fac_type`, `facilities:state.county`, `facilities:state.county.fac_type`, `facilities:code.state.county.fac_type` ) extend { measure: facility_count is num_facilities.sum() airport_count is num_airports.sum() heliport_count is num_heliports.sum() control_tower_count is num_control_towers.sum() } run: facilities -> { aggregate: facility_count, heliport_count }
[ { "facility_count": 19793, "heliport_count": 5135 } ]
SELECT COALESCE(SUM(base."num_facilities"),0) as "facility_count", COALESCE(SUM(base."num_heliports"),0) as "heliport_count" FROM '../data/facilities_with_rollups.parquet' as base WHERE base."rollup"=''
run: facilities -> { group_by: state aggregate: facility_count, heliport_count }
[ { "state": "TX", "facility_count": 1845, "heliport_count": 435 }, { "state": "CA", "facility_count": 984, "heliport_count": 396 }, { "state": "IL", "facility_count": 890, "heliport_count": 245 }, { "state": "FL", "facility_count": 856, "heliport_count": 280 }, { "state": "PA", "facility_count": 804, "heliport_count": 307 } ]
SELECT base."state" as "state", COALESCE(SUM(base."num_facilities"),0) as "facility_count", COALESCE(SUM(base."num_heliports"),0) as "heliport_count" FROM '../data/facilities_with_rollups.parquet' as base WHERE base."rollup"='state' GROUP BY 1 ORDER BY 2 desc NULLS LAST
run: facilities -> { group_by: fac_type aggregate: heliport_count }
[ { "fac_type": "HELIPORT", "heliport_count": 5135 }, { "fac_type": "STOLPORT", "heliport_count": 0 }, { "fac_type": "AIRPORT", "heliport_count": 0 }, { "fac_type": "GLIDERPORT", "heliport_count": 0 }, { "fac_type": "SEAPLANE BASE", "heliport_count": 0 } ]
SELECT base."fac_type" as "fac_type", COALESCE(SUM(base."num_heliports"),0) as "heliport_count" FROM '../data/facilities_with_rollups.parquet' as base WHERE base."rollup"='fac_type' GROUP BY 1 ORDER BY 2 desc NULLS LAST
run: facilities -> { select: code, state }
[ { "code": "ANG", "state": null }, { "code": "N18", "state": null }, { "code": "P16", "state": null }, { "code": "TKK", "state": null }, { "code": "BCV", "state": "AK" } ]
SELECT base."code" as "code", base."state" as "state" FROM '../data/facilities_with_rollups.parquet' as base WHERE base."rollup"='code.state.county.fac_type'
Here we can see that in each of these queries, we use the same 'facilities_with_rollups.parquet'
table, but with different filters on rollup
depending on which fields are used.
Detailed Behavior
Which input source is chosen depends on which fields are used in a given query: the first source listed in the compose()
which has definitions for all fields used in the query will be selected. If a composite source is joined into another source (composite or not), queries against that source will also resolve the composite source according to the same strategy.
Limitations of the Experiment
Views and joins defined in backing sources are not supported; currently they are ignored with a warning
Composite fields used in
on
clauses of always joins which are not referenced elsewhere in the query will not be countedJoins of composite sources appearing in
extend
blocks in subsequent stages will not be resolved, resulting in bad SQL generation (SQL will contain placeholder fields, and will not compile)Indexing a composite source will not work, and will generate SQL with placeholder values; no validation of cube fields will happen