In this project, you'll implement an unoptimized SQL runtime. It is worth 10 points
resolvers += "MimirDB" at "https://maven.mimirdb.info/" libraryDependencies += "edu.buffalo.cse.odin" %% "catalyzer" % "3.0"Note that unlike Spark itself, all code in Catalyzer is fair game for this class, and reviewing it will not count as a violation of Academic integrity.
import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ... def parseSql(sql: String): LogicalPlan = new SparkSqlParser().parsePlan(sql) ...Note that parsePlan will parse both queries (SELECT) and DDL/DML (e.g., CREATE TABLE).
case class CreateTableStatement( tableName: Seq[String], tableSchema: StructType, partitioning: Seq[Transform], bucketSpec: Option[Object], properties: Map[String, String], provider: Option[String], options: Map[String, String], location: Option[String], comment: Option[String], serde: Option[SerdeInfo], external: Boolean, ifNotExists: Boolean) extends ParsedStatement
This is a lot to take in, but for our purposes here, the only three parts of the CreateTableStatement class that we care about are the tableName, tableSchema, and location. The table name may seem a bit weird, as it's a sequence and not a string. This is to manage multi-part table names used to reference tables in a specific schema (e.g., dataSource.tableName). For our purposes, you can always expect this sequence to contain exactly one element.
The second part is the tableSchema. Recall that tables are collections of tuples. Spark refers to tuple types as StructTypes. A StructType's elements are StructFields, which include the field's name, its data type, and some other metadata like whether the field is allowed to be null.
Finally, we have the location. Unlike a CREATE TABLE command in a normal relational database, here we're defining what's called an "external table". We're defining a schema over a remote file so that the database can access its contents. The location field will point at a (relative) path to a data file with the contents of the table. For example:
CREATE TABLE R(id int, fruit string) USING csv OPTIONS(path 'data/R.csv', delimiter '|')The corresponding file located at data/R.data might contain:
1|apple 2|banana 3|clementine 4|duran
You'll note that the provider field will be set to Some("csv") and the options field will be Map("delimiter" -> "|"). Feel free to implement these as you see fit, but they will not change for this project. As in the example above, the data file itself will be CSV-like, with one record per line (\n-delimited), and fields in human-readable strings (|-delimited).
Any other LogicalPlan returned by parsePlan will be a query. You'll need to evaluate this query and return results in a format identical to the data files above. If you need some help debugging, your output should be identical to (modulo row order) the output produced by Sqlite3 for the same query on the same data.
Although you will not be graded on the specific implementation strategy you pick, the following is one relatively straightforward approach to getting an A that closely mirrors how Spark itself is implemented. First though, let's talk a little about how Spark executes queries. Broadly, Spark queries go through five phases:
For this class, we'll be taking a slightly simpler approach to the latter three steps, as they are intended for a large-scale distributed system. For checkpoint 1, you can also still get an A without completing the optimization step (we'll come back to that in checkpoint 2). That leaves us with basically three steps:
Shortly after parsing, Spark applies a two-part process called analysis:
Spark's validation logic is mostly intact, but you'll need to implement the Resolution step yourself. In particular, Spark's SQL Parser leaves behind "placeholder" nodes in both the Expression and LogicalPlan ASTs, whenever the user references something by a string. Normally, the analysis step replaces these placeholders with something that can actually be used. Placeholders that you can expect to encounter are listed below.
Before we discuss placeholders, we need to take a brief digression to explain the exprId field in many Expression AST nodes. For example:
case class AttributeReference( name: String, dataType: DataType, nullable: Boolean = true, override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, val qualifier: Seq[String] = Seq.empty[String]) extends Attribute with Unevaluable {
In most node types, the exprId field is allocated automatically with a fresh identifier (i.e., using NamedExpression.newExprId) when the node is created. Spark uses exprIds internally to keep track of which expressions line up with which other expressions. Two Attributes are the same if and only if they have the same exprId (whether their names are the same or different does not matter).
Both LogicalPlan and Expression provide a transform (and transformUp and transformDown) method to aid with rewriting ASTs. These methods make it very easy to replace parts of the tree. For example, to compute a new tree with UnresolvedRelation nodes replaced, you might write
plan.transform { case UnresolvedRelation(Seq(tableName)) => /* write your replacement here */ }
The other thing to discuss before we move on is resolution. Expression provides a resolved method that checks to see whether the expression has been fully resolved. The dataType method will not work until resolved returns true. resolved, in particular checks for three things:
The last condition is especially tricky, but you can call e.checkInputDataTypes() on each node of the tree to check for errors (see below).
That all being said, here are unresolved nodes you can expect to encounter:
case class UnresolvedRelation( nameElements: Seq[String], options: CaseInsensitiveStringMap, isStreaming: Boolean)
This class is used when a relation is referenced in a LogicalPlan in SQL (typically the FROM clause of a SELECT). The nameElements field encodes the '.'-separated elements of the table name (e.g., foo.bar would be encoded as Seq("foo", "bar")). Under typical use, this sequence will always have only one element. Name elements are case-insensitive.
Occurrences of this class will need to be replaced during analysis with an AST node that knows what attributes the corresponding table has. Spark has several built-in LogicalPlan nodes that can be used to encode tables, but you might find it easier to just create your own subclass of LeafNode to represent a table node. A subclass of LeafNode only needs to implement one field:
case class ____(____) extends LeafNode { val output: AttributeSet = ??? }Note that AttributeSet is a subclass of Seq[Attribute]. In general, the output field should be given as a sequence of AttributeReferences (see above).
case class UnresolvedStar(target: Option[Seq[String]])
Any asterisk * appearing in a SQL is translated into this class. Generally, this happens in three places:
The first two cases (the only two we care about in this checkpoint) are special, as they both represent multiple fields. You'll need to expand these out during the analysis phase. Note that like UnresolvedRelation, table names are Sequences of .-separated strings.
case class UnresolvedAttribute(nameParts: Seq[String])
An attribute name that hasn't been "wired up" with an exprId. In general, there are two cases that need to be handled during Analysis:
For resolving attributes, keep in mind that each operator (that has been resolved already) knows its output schema (typically computing it from the input schema):
val attributes: AttributeSet = source.outputAs above, note that AttributeSet is a subclass of Seq[Attribute] In general, you can expect the contents of this sequence to consist of:
One additional thing that may be helpful in resolving UnresolvedAttributes is that AttributeReference has a qualifiers field Spark uses to store the table name. This field is automatically managed in nested subqueries, but keep in mind that if you're using a custom table class (as suggested above), you will need to set this field yourself when declaring the table's output there.
class MyIterator extends Iterator[MyFoo] { def hasNext: Boolean = /* return true if there are more elements */ def next: MyFoo = /* assemble and return the MyFoo instance */ }For example, here's a simple one that iterates over a range of values.
class Range(low: Int, high: Int) extends Iterator[Int] { var current = low def hasNext = current < high def next: Int = { val ret = current; current += 1; return ret } }In addition to the base table class you created above, you'll be expected to support the following `LogicalPlan` node types:
For each operator, look at the operator in isolation. Imagine that you have an iterator over its input(s). Forget about the full stack of plan nodes and focus on implementing each relational operator in terms of its inputs. Also keep in mind that some operators are just there as decorations (e.g., SubqueryAlias).
It will be helpful (particularly for Project and Filter) to have a way to evaluate primitive-valued expressions. Fortunately, Expression already has a method for this: eval
def eval(input: InternalRow): Any = ???For example:
val row = InternalRow.fromSeq( Seq(1, 2, "bob") ) val literal = expression.eval(row)
Specifically eval takes an org.apache.spark.sql.catalyst.InternalRow as input and produces an Any as output (analogous to Java's Object). InternalRow is a very thin wrapper around an Array[Any]. The InternalRow argument will be passed down through the tree and evaluated.
Observe that some node types are marked as Unevaluable. These will need to be replaced before you call eval. In particular, AttributeReference is unevaluable because it references the attribute symbolically (by expreId), while InternalRow doesn't let you look up attributes this way. An easy way to fix this is to create your own subclass of Expression that instead references the attribute by its position in the InternalRow.
Since InternalRow is already used by Spark, you may wish to use it as the content type for your Iterators as well.
This page last updated 2024-09-19 13:18:43 -0400