In this project, you'll extend your SQL runtime to support aggregate queries. It is worth 8 points.
When the Spark SQL Parser encounters something that looks like a function, it doesn't try to interpret it directly. Instead, it'll produce a UnresolvedFunction expression node. You'll need to replace these.
Like most databases, Spark maintains a "Function Registry", a catalog of all functions and their implementations. All of the "built-in" functions are provided in FunctionRegistry.builtin. Here's a little snippet you can use to replace functions. It doesn't support everything, but will be sufficient for this project.
case UnresolvedFunction(name, arguments, isDistinct, filter, ignoreNulls) =>
{
val builder =
FunctionRegistry.builtin
.lookupFunctionBuilder(name)
.getOrElse {
throw new RuntimeException(
s"Unable to resolve function `${name}`"
)
}
builder(arguments) // returns the replacement expression node.
}
FunctionRegistry.lookupFunctionBuilder returns a 'builder' function. When called on the arguments of the UnresolvedFunction, the builder function returns an expression that implements the function. For example, looking up "regexp_extract" in the registry returns a function that, when called on two string-typed expressions and a literal integer, will return a RegExpExtract object.
Because the Spark SQL Parser doesn't try to resolve functions, it is incapable of distinguishing between normal functions:
SELECT regexp_extract(A, "a(b+)a", 1) FROM Rand aggregate functions:
SELECT sum(A) FROM RBoth of these will parse into a LogicalPlan topped with a Project node.
While not required, you might find it easier to work with the resulting plans if you replace them with actual Aggregate plan nodes. Look for Project nodes with any expression in its projectList that is a subclass of AggregateFunction.
AggregateFunctions are unevaluable, because they don't get evaluated on a single row. Instead, there are several methods on an AggregateFunction that describe how to initialize an accumulator (what Spark calls an AggregationBuffer), how to incorporate input rows into it, and how to extract a final result value from the buffer.
The AggregateFunction can be an instance of either:
The following methods are relevant:
TPC-H is a standard database benchmark. The benchmark consists of a dataset generator and 22 standard query templates. This checkpoint uses three queries based on TPC-H Queries 1, 3, 5, 6, 10, 11, 12, and 14. The dataset generator and template values can be found at the TPC-H website, and is run at scaling factor (SF) 0.1. Minor variations in the queries may be made. The queries have been rewritten slightly to make them easier to Analyze.
SELECT LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS, SUM(LINEITEM.QUANTITY) AS SUM_QTY, SUM(LINEITEM.EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS SUM_DISC_PRICE, SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)*(CAST(1.0 as float)+LINEITEM.TAX)) AS SUM_CHARGE, AVG(LINEITEM.QUANTITY) AS AVG_QTY, AVG(LINEITEM.EXTENDEDPRICE) AS AVG_PRICE, AVG(LINEITEM.DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER FROM LINEITEM WHERE LINEITEM.SHIPDATE <= DATE '1998-10-01' GROUP BY LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS ORDER BY LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS
SELECT LINEITEM.ORDERKEY, SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS REVENUE, ORDERS.ORDERDATE, ORDERS.SHIPPRIORITY FROM CUSTOMER, ORDERS, LINEITEM WHERE CUSTOMER.MKTSEGMENT = 'BUILDING' AND CUSTOMER.CUSTKEY = ORDERS.CUSTKEY AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY AND ORDERS.ORDERDATE < DATE '1995-03-15' AND LINEITEM.SHIPDATE > DATE '1995-03-15' GROUP BY LINEITEM.ORDERKEY, ORDERS.ORDERDATE, ORDERS.SHIPPRIORITY ORDER BY REVENUE DESC, ORDERDATE LIMIT 10
SELECT NATION.NAME, SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE FROM REGION, NATION, CUSTOMER, ORDERS, LINEITEM, SUPPLIER WHERE CUSTOMER.CUSTKEY = ORDERS.CUSTKEY AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY AND LINEITEM.SUPPKEY = SUPPLIER.SUPPKEY AND CUSTOMER.NATIONKEY = NATION.NATIONKEY AND SUPPLIER.NATIONKEY = NATION.NATIONKEY AND NATION.REGIONKEY = REGION.REGIONKEY AND REGION.NAME = 'ASIA' AND ORDERS.ORDERDATE >= DATE '1994-01-01' AND ORDERS.ORDERDATE < DATE '1995-01-01' GROUP BY NATION.NAME ORDER BY REVENUE DESC
SELECT SUM(LINEITEM.EXTENDEDPRICE*LINEITEM.DISCOUNT) AS REVENUE FROM LINEITEM WHERE LINEITEM.SHIPDATE >= DATE '1994-01-01' AND LINEITEM.SHIPDATE < DATE '1995-01-01' AND LINEITEM.DISCOUNT > CAST(0.05 AS float) AND LINEITEM.DISCOUNT < CAST(0.07 as float) AND LINEITEM.QUANTITY < CAST(24 AS float)
SELECT CUSTOMER.CUSTKEY, SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE, CUSTOMER.ACCTBAL, NATION.NAME, CUSTOMER.ADDRESS, CUSTOMER.PHONE, CUSTOMER.COMMENT FROM CUSTOMER, ORDERS, LINEITEM, NATION WHERE CUSTOMER.CUSTKEY = ORDERS.CUSTKEY AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY AND ORDERS.ORDERDATE >= DATE '1993-10-01' AND ORDERS.ORDERDATE < DATE '1994-01-01' AND LINEITEM.RETURNFLAG = 'R' AND CUSTOMER.NATIONKEY = NATION.NATIONKEY GROUP BY CUSTOMER.CUSTKEY, CUSTOMER.ACCTBAL, CUSTOMER.PHONE, NATION.NAME, CUSTOMER.ADDRESS, CUSTOMER.COMMENT ORDER BY REVENUE ASC LIMIT 20
SELECT PK_V.PARTKEY,
PK_V.VALUE
FROM (
SELECT PS.PARTKEY,
SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE
FROM PARTSUPP PS,
SUPPLIER S,
NATION N
WHERE PS.SUPPKEY = S.SUPPKEY
AND S.NATIONKEY = N.NATIONKEY
AND N.NAME = 'GERMANY'
GROUP BY PS.PARTKEY
) PK_V, (
SELECT SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE
FROM PARTSUPP PS,
SUPPLIER S,
NATION N
WHERE PS.SUPPKEY = S.SUPPKEY
AND S.NATIONKEY = N.NATIONKEY
AND N.NAME = 'GERMANY'
) CUTOFF_V
WHERE PK_V.VALUE > (CUTOFF_V.VALUE * CAST(0.0001 AS double) / CAST(100.0 AS double))
ORDER BY PK_V.VALUE DESC
SELECT LINEITEM.SHIPMODE,
SUM(CASE WHEN ORDERS.ORDERPRIORITY = '1-URGENT'
OR ORDERS.ORDERPRIORITY = '2-HIGH'
THEN 1
ELSE 0 END) AS HIGH_LINE_COUNT,
SUM(CASE WHEN ORDERS.ORDERPRIORITY <> '1-URGENT'
AND ORDERS.ORDERPRIORITY <> '2-HIGH'
THEN 1
ELSE 0 END) AS LOW_LINE_COUNT
FROM LINEITEM, ORDERS
WHERE ORDERS.ORDERKEY = LINEITEM.ORDERKEY
AND (LINEITEM.SHIPMODE='MAIL' OR LINEITEM.SHIPMODE='SHIP')
AND LINEITEM.COMMITDATE < LINEITEM.RECEIPTDATE
AND LINEITEM.SHIPDATE < LINEITEM.COMMITDATE
AND LINEITEM.RECEIPTDATE >= DATE '1994-01-01'
AND LINEITEM.RECEIPTDATE < DATE '1995-01-01'
GROUP BY LINEITEM.SHIPMODE
ORDER BY LINEITEM.SHIPMODE
SELECT
CAST(100.00 AS double)
* PROMO_ONLY
/ ALL_REVENUE
AS PROMO_REVENUE
FROM (
SELECT
SUM(
CASE WHEN PART.TYPE LIKE 'PROMO%'
THEN LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)
ELSE cast(0 as float)
END
) AS PROMO_ONLY,
SUM(
LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)
) AS ALL_REVENUE
FROM
LINEITEM,
PART
WHERE
LINEITEM.PARTKEY = PART.PARTKEY
AND LINEITEM.SHIPDATE >= DATE '1995-09-01'
AND LINEITEM.SHIPDATE < DATE '1995-10-01'
) AGGREGATE
This page last updated 2025-08-14 19:13:26 -0400