Malloy Documentation
search

Sources, and in particular, extended sources, are Malloy's primary unit of reusability for defining computations, join relationships, and queries.

Malloy separates queries into a data source and a view on that data. A source can be thought of as a table and a collection extensions that are relevant to that table, including measures (aggregate functions), dimensions (scalar calculations), view definitions, and join relationships to other sources.

A source can be any of the following:

Source type Example
A SQL table or SQL view duckdb.table('data/flights.parquet')
A Malloy query flights -> { group_by: carrier }
A SQL query duckdb.sql("""select 1 as one""")

A source can be used directly in a query:

document
run: duckdb.table('../data/flights.parquet') -> {
  aggregate: flight_count is count()
}
QUERY RESULTS
[
  {
    "flight_count": 344827
  }
]
SELECT 
   COUNT(1) as "flight_count"
FROM '../data/flights.parquet' as base

Or, more commonly, it can be declared with a name so that it can be reused:

document
source: flights is duckdb.table('../data/flights.parquet')

run: flights -> { aggregate: flight_count is count() }
QUERY RESULTS
[
  {
    "flight_count": 344827
  }
]
SELECT 
   COUNT(1) as "flight_count"
FROM '../data/flights.parquet' as base

Any of these kinds of sources can be extended to add reusable definitions or other modifications.

document
source: flights2 is duckdb.table('../data/flights.parquet') extend {
  measure: flight_count is count()
}

Sources from Tables or Views

A source can be created from a SQL table or SQL view from a connected database.

document
source: flights3 is duckdb.table('../data/flights.parquet')

When defining a source in this way, all the columns from the source table are available for use in field definitions or views against the source.

document
run: flights3 -> {
  // Columns from the source table are available
  group_by:
    carrier
    origin
  aggregate: flight_count is count()
  limit: 3
}
QUERY RESULTS
[
  {
    "carrier": "AA",
    "origin": "DFW",
    "flight_count": 8742
  },
  {
    "carrier": "NW",
    "origin": "MSP",
    "flight_count": 8662
  },
  {
    "carrier": "DL",
    "origin": "ATL",
    "flight_count": 8419
  }
]
SELECT 
   base."carrier" as "carrier",
   base."origin" as "origin",
   COUNT(1) as "flight_count"
FROM '../data/flights.parquet' as base
GROUP BY 1,2
ORDER BY 3 desc NULLS LAST
LIMIT 3

Sources from Malloy Queries

In Malloy, every query has an associated output schema, so it can be used as a source for other queries.

For example, in this model we define a query flights_by_carrier:

document
query: flights_by_carrier is duckdb.table('../data/flights.parquet') -> {
  group_by: carrier
  aggregate: lifetime_flights is count()
}

And here, we use the query flights_by_carrier as a source:

document
run: flights_by_carrier -> {
  select: 
    carrier
    lifetime_flights_bucketed is round(lifetime_flights, -4)
    lifetime_flights
  limit: 3
}
QUERY RESULTS
[
  {
    "carrier": "RU",
    "lifetime_flights_bucketed": 20000,
    "lifetime_flights": 16074
  },
  {
    "carrier": "OH",
    "lifetime_flights_bucketed": 0,
    "lifetime_flights": 4420
  },
  {
    "carrier": "CO",
    "lifetime_flights_bucketed": 10000,
    "lifetime_flights": 7139
  }
]
WITH __stage0 AS (
  SELECT 
     base."carrier" as "carrier",
     COUNT(1) as "lifetime_flights"
  FROM '../data/flights.parquet' as base
  GROUP BY 1
)
SELECT 
   base."carrier" as "carrier",
   ROUND(base."lifetime_flights",-4) as "lifetime_flights_bucketed",
   base."lifetime_flights" as "lifetime_flights"
FROM __stage0 as base
LIMIT 3

We can also explicitly define the query as a source, which is useful when adding reusable computations:

document
source: carrier_facts is flights_by_carrier extend {
  dimension:
    lifetime_flights_bucketed is round(lifetime_flights, -4)
}

run: carrier_facts -> {
  select: 
    carrier
    lifetime_flights_bucketed
    lifetime_flights
  limit: 3
}
QUERY RESULTS
[
  {
    "carrier": "RU",
    "lifetime_flights_bucketed": 20000,
    "lifetime_flights": 16074
  },
  {
    "carrier": "OH",
    "lifetime_flights_bucketed": 0,
    "lifetime_flights": 4420
  },
  {
    "carrier": "CO",
    "lifetime_flights_bucketed": 10000,
    "lifetime_flights": 7139
  }
]
WITH __stage0 AS (
  SELECT 
     base."carrier" as "carrier",
     COUNT(1) as "lifetime_flights"
  FROM '../data/flights.parquet' as base
  GROUP BY 1
)
SELECT 
   base."carrier" as "carrier",
   ROUND(base."lifetime_flights",-4) as "lifetime_flights_bucketed",
   base."lifetime_flights" as "lifetime_flights"
FROM __stage0 as base
LIMIT 3

Here we referenced the query name flights_by_carrier, but we can also define a source by writing a query inline and then extending it:

document
source: carrier_facts2 is duckdb.table('../data/flights.parquet') -> {
  group_by: carrier
  aggregate: lifetime_flights is count()
} extend {
  dimension: lifetime_flights_bucketed is round(lifetime_flights, -4)
}

run: carrier_facts2 -> {
  select: carrier, lifetime_flights_bucketed, lifetime_flights
  limit: 3
}
QUERY RESULTS
[
  {
    "carrier": "RU",
    "lifetime_flights_bucketed": 20000,
    "lifetime_flights": 16074
  },
  {
    "carrier": "OH",
    "lifetime_flights_bucketed": 0,
    "lifetime_flights": 4420
  },
  {
    "carrier": "CO",
    "lifetime_flights_bucketed": 10000,
    "lifetime_flights": 7139
  }
]
WITH __stage0 AS (
  SELECT 
     base."carrier" as "carrier",
     COUNT(1) as "lifetime_flights"
  FROM '../data/flights.parquet' as base
  GROUP BY 1
)
SELECT 
   base."carrier" as "carrier",
   ROUND(base."lifetime_flights",-4) as "lifetime_flights_bucketed",
   base."lifetime_flights" as "lifetime_flights"
FROM __stage0 as base
LIMIT 3

Sources from SQL Queries

Sources can be created from a SQL query, e.g.

document
source: limited_users is duckdb.sql("""
  SELECT
    first_name,
    last_name,
    gender
  FROM '../data/users.parquet'
  LIMIT 100
""")

run: limited_users -> {
  group_by: first_name
  aggregate: user_count is count()
}
QUERY RESULTS
[
  {
    "first_name": "JAMES",
    "user_count": 4
  },
  {
    "first_name": "WILLIAM",
    "user_count": 4
  },
  {
    "first_name": "RANDY",
    "user_count": 4
  },
  {
    "first_name": "DANIEL",
    "user_count": 3
  },
  {
    "first_name": "ROBERT",
    "user_count": 3
  }
]
SELECT 
   base."first_name" as "first_name",
   COUNT(1) as "user_count"
FROM (
  SELECT
    first_name,
    last_name,
    gender
  FROM '../data/users.parquet'
  LIMIT 100
) as base
GROUP BY 1
ORDER BY 2 desc NULLS LAST

Like with duckdb.table('data/users.parquet'), Malloy fetches the schema from the database to make columns of the resulting table accessible in computations.

Source Extensions

Any source can be extended to add filters, specify a primary key, add fields and joins, rename fields, or limit which fields are available.

Extensions are often added when defining a source for the first time:

document
source: flights5 is duckdb.table('../data/flights.parquet') extend {
  measure: flight_count is count()
}

An existing source can also be extended, then given a new name or used in a query.

document
source: flights6 is duckdb.table('../data/flights.parquet')

source: flights_ext is flights6 extend {
  measure: flight_count is count()
}

run: flights6 extend { 
  measure: total_distance is sum(distance) 
  # percent
  measure: percent_distance is total_distance / all(total_distance)
} -> {
  group_by: carrier
  aggregate: total_distance, percent_distance
}
QUERY RESULTS
[
  {
    "carrier": "WN",
    "total_distance": 54619152,
    "percent_distance": 0.21390989275965064
  },
  {
    "carrier": "UA",
    "total_distance": 38882934,
    "percent_distance": 0.15228072823467806
  },
  {
    "carrier": "AA",
    "total_distance": 37684885,
    "percent_distance": 0.14758870128576448
  },
  {
    "carrier": "NW",
    "total_distance": 33376503,
    "percent_distance": 0.13071539773122362
  },
  {
    "carrier": "US",
    "total_distance": 23721642,
    "percent_distance": 0.09290319806325123
  }
]
WITH __stage0 AS (
  SELECT
    group_set,
    CASE WHEN group_set=1 THEN
      base."carrier"
      END as "carrier__1",
    CASE WHEN group_set=1 THEN
      COALESCE(SUM(base."distance"),0)
      END as "total_distance__1",
    (CASE WHEN group_set=1 THEN
      COALESCE(SUM(base."distance"),0)
      END)*1.0/MAX((CASE WHEN group_set=0 THEN
      COALESCE(SUM(base."distance"),0)
      END)) OVER () as "percent_distance__1"
  FROM '../data/flights.parquet' as base
  CROSS JOIN (SELECT UNNEST(GENERATE_SERIES(0,1,1)) as group_set  ) as group_set
  GROUP BY 1,2
)
SELECT
  "carrier__1" as "carrier",
  MAX(CASE WHEN group_set=1 THEN "total_distance__1" END) as "total_distance",
  MAX(CASE WHEN group_set=1 THEN "percent_distance__1" END) as "percent_distance"
FROM __stage0
WHERE group_set NOT IN (0)
GROUP BY 1
ORDER BY 2 desc NULLS LAST

The following subsections document the various kinds of source extensions.

Adding Fields

Fields—dimensions, measures, and views—may be defined as part of a source extension, allowing for them to be used in any query against the source, or in other fields within that source.

document
source: airports2 is duckdb.table('../data/airports.parquet') extend {
  dimension: has_control_tower is cntl_twr = 'Y'

  measure:
    airport_count is count()
    average_elevation is avg(elevation)

  view: average_elevation_by_control_tower is {
    group_by: has_control_tower
    aggregate: average_elevation
  }
}

run: airports2 -> {
  group_by: state
  aggregate: airport_count
  nest: average_elevation_by_control_tower
  limit: 2
}
QUERY RESULTS
[
  {
    "state": "TX",
    "airport_count": 1845,
    "average_elevation_by_control_tower": [
      {
        "has_control_tower": false,
        "average_elevation": 899.2608453837597
      },
      {
        "has_control_tower": true,
        "average_elevation": 861.2340425531914
      }
    ]
  },
  {
    "state": "CA",
    "airport_count": 984,
    "average_elevation_by_control_tower": [
      {
        "has_control_tower": false,
        "average_elevation": 1130.6059275521404
      },
      {
        "has_control_tower": true,
        "average_elevation": 552.3561643835617
      }
    ]
  }
]
WITH __stage0 AS (
  SELECT
    group_set,
    base."state" as "state__0",
    CASE WHEN group_set=0 THEN
      COUNT(1)
      END as "airport_count__0",
    CASE WHEN group_set=1 THEN
      base."cntl_twr"='Y'
      END as "has_control_tower__1",
    CASE WHEN group_set=1 THEN
      AVG(base."elevation")
      END as "average_elevation__1"
  FROM '../data/airports.parquet' as base
  CROSS JOIN (SELECT UNNEST(GENERATE_SERIES(0,1,1)) as group_set  ) as group_set
  GROUP BY 1,2,4
)
SELECT
  "state__0" as "state",
  MAX(CASE WHEN group_set=0 THEN "airport_count__0" END) as "airport_count",
  COALESCE(LIST({
    "has_control_tower": "has_control_tower__1", 
    "average_elevation": "average_elevation__1"}  ORDER BY  "average_elevation__1" desc NULLS LAST) FILTER (WHERE group_set=1),[]) as "average_elevation_by_control_tower"
FROM __stage0
GROUP BY 1
ORDER BY 2 desc NULLS LAST
LIMIT 2

For more information about fields and how to define them, see the Fields section, or for information on views specifically, see the Views section.

Filtering Sources

Filters can be added as a source extension with a where: clause. These filters apply to any query against the source.

document
source: flights7 is duckdb.table('../data/flights.parquet')

source: long_sfo_flights is flights7 extend {
  where: origin = 'SFO' and distance > 1000
}

run: long_sfo_flights -> { 
  group_by: destination
  aggregate: flight_count is count()
  limit: 3 
}
QUERY RESULTS
[
  {
    "destination": "ORD",
    "flight_count": 362
  },
  {
    "destination": "DFW",
    "flight_count": 304
  },
  {
    "destination": "MSP",
    "flight_count": 281
  }
]
SELECT 
   base."destination" as "destination",
   COUNT(1) as "flight_count"
FROM '../data/flights.parquet' as base
WHERE (base."origin"='SFO') and (base."distance">1000)
GROUP BY 1
ORDER BY 2 desc NULLS LAST
LIMIT 3

Primary Keys

To be used in with-style joins to other sources, a source must have a primary key specified as an extension.

document
source: carriers is duckdb.table('../data/carriers.parquet') extend {
  primary_key: code
}

Joins

When sources are joined as part of their definition, queries can reference fields in the joined sources without having to specify the join relationship each time.

document
source: carriers2 is duckdb.table('../data/carriers.parquet') extend {
  primary_key: code
}

source: flights9 is duckdb.table('../data/flights.parquet') extend {
  join_one: carriers with carrier
  measure: flight_count is count()
}

run: flights9 -> {
  group_by: carriers.nickname
  aggregate: flight_count
  limit: 3
}
QUERY RESULTS
[
  {
    "nickname": "Southwest",
    "flight_count": 88751
  },
  {
    "nickname": "USAir",
    "flight_count": 37683
  },
  {
    "nickname": "American",
    "flight_count": 34577
  }
]
SELECT 
   carriers_0."nickname" as "nickname",
   COUNT(1) as "flight_count"
FROM '../data/flights.parquet' as base
 LEFT JOIN '../data/carriers.parquet' AS carriers_0
  ON carriers_0."code"=base."carrier"
GROUP BY 1
ORDER BY 2 desc NULLS LAST
LIMIT 3

See the Joins section for more information on working with joins.

Renaming Fields

Fields from a source may be renamed in the context of the new source. This is useful when the original name is not descriptive, or has a different meaning in the new context.

document
source: airports5 is duckdb.table('../data/airports.parquet') extend {
  rename: facility_type is fac_type
}

Limiting Access to Fields

The list of fields available in a source can be limited. This can be done either by accepting a list of fields to include (in which case any other field from the source is excluded, i.e. an "allow list") or by excepting a list of fields to exclude (any other field is included, i.e. a "deny list"). These cannot be used in conjunction with one another.

In this example, we define airports to extend the airports.parquet table, but we limit the included columns to only id, code, city, state, and elevation.

document
source: airports6 is duckdb.table('../data/airports.parquet') extend {
  accept: id, code, city, state, elevation
}

Here, we do the same, but instead of specifying which columns to include, we specify to include all columns except c_ldg_rts, aero_cht, and cntl_twr.

document
source: airports7 is duckdb.table('../data/airports.parquet') extend {
  except: c_ldg_rts, aero_cht, cntl_twr
}