Malloy Documentation
search

Composite sources in Malloy are virtualized sources which are backed by multiple other tables or sources. This is useful for handling cubed / rolled up data.

document
##! experimental { composite_sources }

Syntax

source: composite_source is compose(base_one, base_two, ...) extend {
  // ... extra definitions
}

Simple Example

Suppose our airports table has billions of rows instead of the thousands it actually has. Querying such a table would be extremely expensive, and most often not very useful. A common strategy for analysis on extremely large datasets is to roll up or "cube" the data. Let's look at how composite sources can help us query this kind of data.

One kind of analysis that might be common with our airports table is to look at number of airports by state. We'll start with a simplified airports source with only the airport_code and state.

document
source: airports is duckdb.table('../data/airports.parquet') extend {
  accept: code, state
  rename: airport_code is code
}

We can roll up this table by performing a query like this:

document
run: airports -> {
  group_by: state
  aggregate: airport_count is count()
}
QUERY RESULTS
[
  {
    "state": "TX",
    "airport_count": 1845
  },
  {
    "state": "CA",
    "airport_count": 984
  },
  {
    "state": "IL",
    "airport_count": 890
  },
  {
    "state": "FL",
    "airport_count": 856
  },
  {
    "state": "PA",
    "airport_count": 804
  }
]
SELECT 
   base."state" as "state",
   COUNT(1) as "airport_count"
FROM '../data/airports.parquet' as base
GROUP BY 1
ORDER BY 2 desc NULLS LAST

Now suppose we store the result in a table called state_facts. We can then create a source based on that table which will allow us to perform analysis on number of aiports per state much more efficiently.

Note how we rename airport_count to state_airport_count, then redefine airport_count as a sum of state_airport_counts.

document
source: state_facts is duckdb.table('../data/state_facts.parquet') extend {
  accept: state, airport_count
  rename: state_airport_count is airport_count
  measure: airport_count is state_airport_count.sum()
}

run: state_facts -> { 
  group_by: state
  aggregate: airport_count
  order_by: airport_count desc
}
QUERY RESULTS
[
  {
    "state": "TX",
    "airport_count": 1845
  },
  {
    "state": "CA",
    "airport_count": 984
  },
  {
    "state": "IL",
    "airport_count": 890
  },
  {
    "state": "FL",
    "airport_count": 856
  },
  {
    "state": "PA",
    "airport_count": 804
  }
]
SELECT 
   base."state" as "state",
   COALESCE(SUM(base."airport_count"),0) as "airport_count"
FROM '../data/state_facts.parquet' as base
GROUP BY 1
ORDER BY 2 desc NULLS LAST

This is great, but if we have many different levels of rollup which we would like to query consistently, having to choose which base source to use is error-prone and tedious. This is where composite sources help us. We can define a composite source airports_composite, which is backed by both airports and state_facts. Depending on the query, Malloy will automatically select which to use.

document
source: airports_composite is compose(state_facts, airports)

Now if we run a query that only uses state and airport_count, we will use state_facts, but if we use airport_code, we will fall back to the full airports table. You can view the SQL tab to see that each query is backed by the appropriate table.

document
run: airports_composite -> { group_by: state; aggregate: airport_count}
QUERY RESULTS
[
  {
    "state": "TX",
    "airport_count": 1845
  },
  {
    "state": "CA",
    "airport_count": 984
  },
  {
    "state": "IL",
    "airport_count": 890
  },
  {
    "state": "FL",
    "airport_count": 856
  },
  {
    "state": "PA",
    "airport_count": 804
  }
]
SELECT 
   base."state" as "state",
   COALESCE(SUM(base."airport_count"),0) as "airport_count"
FROM '../data/state_facts.parquet' as base
GROUP BY 1
ORDER BY 2 desc NULLS LAST
document
run: airports_composite -> { select: airport_code, state }
QUERY RESULTS
[
  {
    "airport_code": "1Q9",
    "state": null
  },
  {
    "airport_code": "Q51",
    "state": null
  },
  {
    "airport_code": "3N1",
    "state": null
  },
  {
    "airport_code": "03N",
    "state": null
  },
  {
    "airport_code": "ANG",
    "state": null
  }
]
SELECT 
   base."code" as "airport_code",
   base."state" as "state"
FROM '../data/airports.parquet' as base

Composite Sources Based on Partitions

Another common pattern is to define multiple levels of roll up in the same table with different partitions. We can generate a table with multiple levels of rollup by using GROUPING SETS and GROUPING_ID.

SELECT 
  code,
  state, 
  county,
  fac_type,
  CASE GROUPING_ID(code, state, county, fac_type) 
    WHEN 0 THEN 'code.state.county.fac_type'
    WHEN 8 THEN 'state.county.fac_type'
    WHEN 9 THEN 'state.county'
    WHEN 10 THEN 'state.fac_type'
    WHEN 12 THEN 'county.fac_type'
    WHEN 11 THEN 'state'
    WHEN 13 THEN 'county'
    WHEN 14 THEN 'fac_type'
    ELSE ''
  END AS 'rollup',
  COUNT() AS 'num_facilities',
  COUNT(CASE WHEN fac_type='HELIPORT' THEN 1 END) AS 'num_heliports',
  COUNT(CASE WHEN fac_type='AIRPORT' THEN 1 END) AS 'num_airports',
  COUNT(CASE WHEN cntl_twr='Y' THEN 1 END) AS 'num_control_towers'
FROM '../data/airports.parquet' 
GROUP BY GROUPING SETS (
  (code, state, county, fac_type),
  (state, county, fac_type),
  (state, county),
  (state, fac_type),
  (county, fac_type),
  (state),
  (county),
  (fac_type),
  ()
)
document
source: facilities_with_rollups is duckdb.table('../data/facilities_with_rollups.parquet')

run: facilities_with_rollups -> { 
  select: state, num_airports
  order_by: num_airports desc
  where: rollup = 'state' 
}
QUERY RESULTS
[
  {
    "state": "TX",
    "num_airports": 1389
  },
  {
    "state": "IL",
    "num_airports": 625
  },
  {
    "state": "CA",
    "num_airports": 569
  },
  {
    "state": "OH",
    "num_airports": 537
  },
  {
    "state": "FL",
    "num_airports": 511
  }
]
SELECT 
   base."state" as "state",
   base."num_airports" as "num_airports"
FROM '../data/facilities_with_rollups.parquet' as base
WHERE base."rollup"='state'
ORDER BY 2 desc NULLS LAST
document
run: facilities_with_rollups -> { 
  select: state, county, num_airports
  order_by: num_airports desc
  where: rollup = 'state.county' 
}
QUERY RESULTS
[
  {
    "state": "CA",
    "county": "SAN BERNARDINO",
    "num_airports": 47
  },
  {
    "state": "TX",
    "county": "DENTON",
    "num_airports": 47
  },
  {
    "state": "AZ",
    "county": "MARICOPA",
    "num_airports": 46
  },
  {
    "state": "CA",
    "county": "KERN",
    "num_airports": 41
  },
  {
    "state": "AK",
    "county": "MATA-SUS BOROUGH",
    "num_airports": 41
  }
]
SELECT 
   base."state" as "state",
   base."county" as "county",
   base."num_airports" as "num_airports"
FROM '../data/facilities_with_rollups.parquet' as base
WHERE base."rollup"='state.county'
ORDER BY 3 desc NULLS LAST

Given this table, we can define our backing sources.

document
source: `facilities:code.state.county.fac_type` is facilities_with_rollups extend {
  where: rollup = 'code.state.county.fac_type'
}

source: `facilities:state.county.fac_type` is facilities_with_rollups extend {
  accept: 
    rollup, 
    state, county, fac_type,
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = 'state.county.fac_type'
}

source: `facilities:state.county` is facilities_with_rollups extend {
  accept: 
    rollup, 
    state, county,
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = 'state.county'
}

source: `facilities:state.fac_type` is facilities_with_rollups extend {
  accept: 
    rollup, 
    state, fac_type,
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = 'state.fac_type'
}

source: `facilities:county.fac_type` is facilities_with_rollups extend {
  accept: 
    rollup, 
    county, fac_type,
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = 'county.fac_type'
}

source: `facilities:state` is facilities_with_rollups extend {
  accept: 
    rollup, 
    state,
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = 'state'
}

source: `facilities:county` is facilities_with_rollups extend {
  accept: 
    rollup, 
    county,
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = 'county'
}

source: `facilities:fac_type` is facilities_with_rollups extend {
  accept: 
    rollup, 
    fac_type,
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = 'fac_type'
}

source: `facilities:` is facilities_with_rollups extend {
  accept: 
    rollup, 
    num_facilities, num_airports, num_heliports, num_control_towers
  where: rollup = ''
}

source: facilities is compose(
  `facilities:`,
  `facilities:fac_type`,
  `facilities:county`,
  `facilities:state`,
  `facilities:county.fac_type`,
  `facilities:state.fac_type`,
  `facilities:state.county`,
  `facilities:state.county.fac_type`,
  `facilities:code.state.county.fac_type`
) extend {
  measure:
    facility_count is num_facilities.sum()
    airport_count is num_airports.sum()
    heliport_count is num_heliports.sum()
    control_tower_count is num_control_towers.sum()
}

run: facilities -> { 
  aggregate: facility_count, heliport_count
}
QUERY RESULTS
[
  {
    "facility_count": 19793,
    "heliport_count": 5135
  }
]
SELECT 
   COALESCE(SUM(base."num_facilities"),0) as "facility_count",
   COALESCE(SUM(base."num_heliports"),0) as "heliport_count"
FROM '../data/facilities_with_rollups.parquet' as base
WHERE base."rollup"=''
document
run: facilities -> { 
  group_by: state 
  aggregate: facility_count, heliport_count
}
QUERY RESULTS
[
  {
    "state": "TX",
    "facility_count": 1845,
    "heliport_count": 435
  },
  {
    "state": "CA",
    "facility_count": 984,
    "heliport_count": 396
  },
  {
    "state": "IL",
    "facility_count": 890,
    "heliport_count": 245
  },
  {
    "state": "FL",
    "facility_count": 856,
    "heliport_count": 280
  },
  {
    "state": "PA",
    "facility_count": 804,
    "heliport_count": 307
  }
]
SELECT 
   base."state" as "state",
   COALESCE(SUM(base."num_facilities"),0) as "facility_count",
   COALESCE(SUM(base."num_heliports"),0) as "heliport_count"
FROM '../data/facilities_with_rollups.parquet' as base
WHERE base."rollup"='state'
GROUP BY 1
ORDER BY 2 desc NULLS LAST
document
run: facilities -> { 
  group_by: fac_type
  aggregate: heliport_count
}
QUERY RESULTS
[
  {
    "fac_type": "HELIPORT",
    "heliport_count": 5135
  },
  {
    "fac_type": "STOLPORT",
    "heliport_count": 0
  },
  {
    "fac_type": "AIRPORT",
    "heliport_count": 0
  },
  {
    "fac_type": "GLIDERPORT",
    "heliport_count": 0
  },
  {
    "fac_type": "SEAPLANE BASE",
    "heliport_count": 0
  }
]
SELECT 
   base."fac_type" as "fac_type",
   COALESCE(SUM(base."num_heliports"),0) as "heliport_count"
FROM '../data/facilities_with_rollups.parquet' as base
WHERE base."rollup"='fac_type'
GROUP BY 1
ORDER BY 2 desc NULLS LAST
document
run: facilities -> { 
  select: code, state 
}
QUERY RESULTS
[
  {
    "code": "ANG",
    "state": null
  },
  {
    "code": "N18",
    "state": null
  },
  {
    "code": "P16",
    "state": null
  },
  {
    "code": "TKK",
    "state": null
  },
  {
    "code": "BCV",
    "state": "AK"
  }
]
SELECT 
   base."code" as "code",
   base."state" as "state"
FROM '../data/facilities_with_rollups.parquet' as base
WHERE base."rollup"='code.state.county.fac_type'

Here we can see that in each of these queries, we use the same 'facilities_with_rollups.parquet' table, but with different filters on rollup depending on which fields are used.

Detailed Behavior

Which input source is chosen depends on which fields are used in a given query: the first source listed in the compose() which has definitions for all fields used in the query will be selected. If a composite source is joined into another source (composite or not), queries against that source will also resolve the composite source according to the same strategy.

Limitations of the Experiment

  • Views and joins defined in backing sources are not supported; currently they are ignored with a warning

  • Composite fields used in on clauses of always joins which are not referenced elsewhere in the query will not be counted

  • Joins of composite sources appearing in extend blocks in subsequent stages will not be resolved, resulting in bad SQL generation (SQL will contain placeholder fields, and will not compile)

  • Indexing a composite source will not work, and will generate SQL with placeholder values; no validation of cube fields will happen