Besides being remarkably stable, performant and reliable, PostgreSQL offers a plethora of more or less known attractive features, has a meticulous source code and documentation, and a bountiful ecosystem to spur. That is why at its core, Splitgraph is based on PostgreSQL. One particular feature that stands out, and has been underutilized in our opinion, is foreign data wrappers (FDW).
As we previously elaborated, FDWs enable SQL to serve as a "lingua franca" of sorts, and make any remote data source queryable by the user locally, whether it's "SQL-native" (PostgreSQL, MySQL, SQLite, Snowflake, etc.) or not (MongoDB, Redis, Elasticsearch, CSVs in S3, etc.). In other words, if it has an API you can probably make an FDW for it.
The gold standard for all FDW implementations is postgres_fdw
,
the native PostgreSQL FDW, which illustrates the full scope of its capabilities. Besides basic read and write operations,
this also includes support for handling non-trivial SQL constructs in a clever way, namely
pushdown of join and aggregation queries.
In this article, the first part in a series of posts (check out part II, and III), we start to describe how we implemented aggregation pushdown in a number of FDWs that we employ, all based on a common framework named Multicorn.
Writing an FDW implementation from scratch is not exactly trivial, and neither is adding aggregation push-down capability. This canonically involves writing PostgreSQL-flavored C code extensions, which despite being surprisingly smooth, can still be quite laborious and error prone.
Enter Multicorn. On the one hand, it provides a high-level interface for last-mile data wrangling to be performed in Python, customized for each specific FDW implementation. On the other hand, the shared core internally couples into the FDW mechanism in accordance with the low-level API, so that the users need not worry about it.
To give a concrete example, let's focus on a working FDW toy model written in Multicorn—one in which we query a Pandas data frame, but for the sake of drama let's pretend this data source is found on a remote server, somewhere across the network.
import pandas as pd
from multicorn import ForeignDataWrapper
df = pd.DataFrame({
"number": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
"parity": ["even", "odd", "even", "odd", "even", "odd", "even", "odd", "even", "odd"]
})
def fake_remote_pandas_endpoint(columns):
return df[columns].to_dict("records")
class PandasFdw(ForeignDataWrapper):
def execute(self, quals, columns):
for row in fake_remote_pandas_endpoint(columns):
yield row
return
Now, after we instantiate the FDW:
CREATE SERVER pandas_server FOREIGN DATA WRAPPER multicorn OPTIONS ( wrapper 'multicorn.pandasfdw.PandasFdw');
CREATE FOREIGN TABLE pandas_table(number int, parity character varying(4)) SERVER pandas_server;
we can go ahead and actually query it:
sgr@localhost:splitgraph> SELECT number, parity FROM pandas_table
+--------+--------+
| number | parity |
|--------+--------|
| 0 | even |
| 1 | odd |
| 2 | even |
| 3 | odd |
| 4 | even |
| 5 | odd |
| 6 | even |
| 7 | odd |
| 8 | even |
| 9 | odd |
+--------+--------+
In fact, we can do aggregations too:
sgr@localhost:splitgraph> SELECT count(number) FROM pandas_table
+-------+
| count |
|-------|
| 10 |
+-------+
There is only one issue however, as evidenced by the execute
API—Multicorn doesn't support the aggregation pushdown
mechanism. In particular, without aggregation pushdown the query sent to the remote would amount to a full table scan,
and the aggregation would be performed locally by PostgreSQL itself only after fetching all the rows. Needless to say
that this is sub-optimal, given that in real scenarios aggregation results most often involve orders of magnitude smaller
outputs than the full size of the underlying data, and at its most extreme reduce to a single row.
Moreover, this impacts not only speed, but can also end up being quite costly due to the substantial egress fees on many cloud providers, while at the same time neglecting to utilize inbuilt (likely optimized) aggregation capabilities of the remote data source. Given that we've previously customized the Multicorn repository in order to ensure Splitgraph's column-oriented nature, it made sense to try and implement this functionality ourselves.
Generally speaking, an FDW implementation hooks into the planning and execution phases of the query processing flow. In particular, during the planning phase PostgreSQL will consult the FDW implementation for an estimate of the remote data size to be fetched (to aid in optimal plan selection), then ask it to create corresponding foreign access path(s), with each path representing one competing approach that will result in the same output for the given query. Lastly, the FDW is required to generate a plan out of the selected path.
Next, in the execution phase PostgreSQL delegates to the FDW the responsibility of preparing for the scan, iterating through the scan itself, and finally cleaning up any leftover resources. Note that each of the aforementioned steps corresponds to one of the pre-determined callback functions that must be implemented by the FDW.
In order to move beyond the most basic foreign scan, one needs to start implementing some of the optional advanced
callback functions; in the case of aggregation pushdown this is GetForeignUpperPaths
(and the contained functions).
In PostgreSQL terminology, post-scan/join query constructs, such as aggregations among other things, are known as upper
relations, hence the name.
Our own version of GetForeignUpperPaths
closely follows
that of postgres_fdw
itself, albeit in a considerably reduced form and with a long tail of various tweaks in order to
meet our needs. It is beyond the scope of this blog post to go through the entire implementation (relevant PRs: #1,
#2, #3), so we will give a higher
level overview of how it works.
The main outcome of GetForeignUpperPaths
is the addition of a foreign access path, whereby the aggregation would be
pushed to the remote data source, and which the planner can then (most likely) choose as the cheapest path. However, in
producing this outcome there is a problem to be addressed first: can the desired query be executed by the remote in the
first place?
We should not push down the aggregation if it cannot be performed on the other side. Think of specific aggregation
functions (e.g. jsonb_agg
, aka aggregate values, including nulls, as a JSON array) or operators
(e.g. ~~*
, aka ILIKE
), which don't necessarily have an equivalent in the remote data source. We must detect any such
cases, and skip adding any corresponding upper access paths. The aggregation will have to be performed the hard way,
after fetching all the relevant rows first.
We achieve this verification by recursively walking through the expression tree for each expression of the path target,
making sure each construct involved is shippable. To get a better sense of the task at hand, let's take a closer look at
the relevant upper path target for our simple example. One way to see that is to look at the targetList
field of the
Query
struct with e.g. debug_print_rewritten
option ON
(for brevity, we purposefully omit fields irrelevant to our discussion):
(
{QUERY
:commandType 1
⋮
:hasAggs true
⋮
:targetList (
{TARGETENTRY
:expr
{AGGREF
:aggfnoid 2147
:aggtype 20
⋮
:args (
{TARGETENTRY
:expr
{VAR
:varno 1
:varattno 1
:vartype 23
⋮
}
⋮
}
)
⋮
}
⋮
:resname count
⋮
}
)
⋮
}
)
There are a number of things worth noting, notably commandType
equal to 1 corresponds to a SELECT
statement, while
hasAggs
is true
due to the count
invocation. More importantly, note that the expression list has one element,
consisting of two nested nodes, namely Aggref
and
Var
, each of which is encapsulated by a
TargetEntry
struct. These in turn contain further useful
information.
On the one hand, Aggrefs aggfnoid
holds the Oid of the function, from which we can extract the function name that
we want to pass to the PandasFdw
. Note that we cannot rely on resname
of the target entry, since this refers only to
the output column name, and defaults to count
simply because we haven't used a column alias in this case. Also
important is the aggtype
field, which denotes the type of the output (in this case
INT8OID
aka bigint
); in some cases we must apply type coercion if the remote data source does not comply (e.g. Elasticsearch
returning a float for aggregations over an integer field).
On the other hand, Vars varno
can determine whether the variable belongs to a foreign table at all, while vartype
represents the type of the column itself (
INT4OID
aka integer
). Finally, varattno
corresponds to the position of the column in the table, and can be used to fetch the
actual column name, another data point to be passed into our PandasFdw
.
Having established that we only need Aggref and Var nodes simplifies the expression walking algorithm significantly.
We've also identified where we can obtain the data that needs to be passed to PandasFdw
, namely function name and column
name. We will pass those through a keyword argument aggs
, in the following form:
{
"count.number":
{
"column": "number",
"function": "count"
},
...
}
This is also where the peculiarities of Multicorn manifest compared to all other FDW implementations. Since Multicorn is devised to be a generic FDW framework, one does not know ahead of time which functions and operators are supported.
In order to help determine that, we will add a specific method named can_pushdown_upperrel
to our PandasFdw
class
which will be invoked during a GetForeignUpperPaths
call. For now this method only needs to provide Multicorn with a
list of supported aggregation functions that can safely be propagated.
Let us look at how PandasFdw
looks like with support for aggregations added:
import json
import pandas as pd
from multicorn import ForeignDataWrapper
df = pd.DataFrame({
"number": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
"parity": ["even", "odd", "even", "odd", "even", "odd", "even", "odd", "even", "odd"]
})
def fake_remote_pandas_endpoint(columns, aggs=None):
if aggs is not None:
# Returns {"column_1": {"avg": x, "sum": y}, "column_2": {"min": z}, ..}
return df.agg(aggs).to_dict()
return df[columns].to_dict("records")
def _convert_aggs_arg(aggs):
# Convert aggs in accordance with Pandas API:
# {"column_1": ["avg", "sum"], "column_2": ["min"], ..}
pandas_aggs = {}
for agg_props in aggs.values():
if agg_props["column"] not in pandas_aggs:
pandas_aggs[agg_props["column"]] = [agg_props["function"]]
else:
pandas_aggs[agg_props["column"]].append(agg_props["function"])
return pandas_aggs
class PandasFdw(ForeignDataWrapper):
def can_pushdown_upperrel(self):
return {
"agg_functions": ["min", "max", "sum", "avg", "count"]
}
def explain(self, quals, columns, aggs=None, verbose=False):
return [
f"columns: {columns}",
f"aggs: {json.dumps(aggs, indent=4)}"
]
def execute(self, quals, columns, aggs=None):
if aggs is not None:
pandas_aggs = _convert_aggs_arg(aggs)
row = fake_remote_pandas_endpoint(columns, pandas_aggs)
# Convert result back to Multicorn API:
# {"column_1.avg": x, "column_1.sum": y, "column_2.min": z, ...}
result = {}
for agg_name, agg_props in aggs.items():
result[agg_name] = row[agg_props["column"]][agg_props["function"]]
yield result
else:
for row in fake_remote_pandas_endpoint(columns):
yield row
return
A couple of important generic observations here:
_convert_aggs_arg
, that simply needs to adapt the aggs
argument to the one our Pandas endpoint acceptscan_pushdown_upperrel
returns a dict with a sole element being the list of supported functions under the key agg_functions
explain
method, that can shed light on the plan chosen by Postgres; if the aggs
argument is not None
then Postgres opted for the pushdownaggs
Now we can confirm that aggregation pushdown is chosen as the optimal path:
sgr@localhost:splitgraph> EXPLAIN SELECT count(number) FROM pandas_table
+------------------------------------------------+
| QUERY PLAN |
|------------------------------------------------|
| Foreign Scan (cost=1.00..1.00 rows=1 width=1) |
| Multicorn: columns: ['number'] |
| Multicorn: aggs: { |
| "count.number": { |
| "function": "count", |
| "column": "number" |
| } |
| } |
+------------------------------------------------+
sgr@localhost:splitgraph> SELECT count(number) FROM pandas_table
+-------+
| count |
|-------|
| 10 |
+-------+
A proper aggs
object was passed down, which means that PostgreSQL did in fact decide to go for an aggregation
pushdown. If we try a combo of other aggregation functions:
sgr@localhost:splitgraph> SELECT min(number), max(number), sum(number), avg(number)
FROM pandas_table
Error in python: AttributeError
DETAIL: 'avg' is not a valid function for 'Series' object
we see that we get an error. The problem arises since we propagate the Postgres average function avg
verbatim, instead
of using the Pandas native average
. Here's a straightforward fix:
@@ -13,16 +13,25 @@ def fake_remote_pandas_endpoint(columns, aggs=None):
return df[columns].to_dict("records")
+_PG_TO_PANDAS_FUNC_MAP = {
+ "min": "min",
+ "max": "max",
+ "sum": "sum",
+ "avg": "average",
+ "count": "count",
+}
def _convert_aggs_arg(aggs):
# Convert aggs in accordance with Pandas API:
# {"column_1": ["avg", "sum"], "column_2": ["min"], ..}
pandas_aggs = {}
for agg_props in aggs.values():
+ function_name = _PG_TO_PANDAS_FUNC_MAP[agg_props["function"]]
+
if agg_props["column"] not in pandas_aggs:
- pandas_aggs[agg_props["column"]] = [agg_props["function"]]
+ pandas_aggs[agg_props["column"]] = [function_name]
else:
- pandas_aggs[agg_props["column"]].append(agg_props["function"])
+ pandas_aggs[agg_props["column"]].append(function_name)
return pandas_aggs
@@ -31,7 +40,7 @@ class PandasFdw(ForeignDataWrapper):
def can_pushdown_upperrel(self):
return {
- "agg_functions": ["min", "max", "sum", "avg", "count"]
+ "agg_functions": list(_PG_TO_PANDAS_FUNC_MAP)
}
def explain(self, quals, columns, aggs=None, verbose=False):
Trying out the same query with the above changes gives us the correct result:
sgr@localhost:splitgraph> SELECT min(number), max(number), sum(number), avg(number)
FROM pandas_table
+-----+-----+-----+-----+
| min | max | sum | avg |
|-----+-----+-----+-----|
| 0 | 9 | 45 | 4.5 |
+-----+-----+-----+-----+
GROUP BY
clausesThe next logical step is enabling pushdown of aggregation queries in the broader sense, i.e. ones with a grouping clause:
SELECT parity, count(number) FROM pandas_table GROUP BY parity
Let's take a look at what changed in the query tree:
(
{QUERY
:commandType 1
⋮
:hasAggs true
⋮
:targetList (
{TARGETENTRY
:expr
{VAR
:varno 1
:varattno 2
:vartype 1043
⋮
}
⋮
:resname parity
:ressortgroupref 1
⋮
}
{TARGETENTRY
:expr
{AGGREF
:aggfnoid 2147
:aggtype 20
⋮
:args (
{TARGETENTRY
:expr
{VAR
:varno 1
:varattno 1
:vartype 23
⋮
}
⋮
}
)
⋮
}
⋮
:resname count
⋮
}
)
⋮
:groupClause (
{SORTGROUPCLAUSE
:tleSortGroupRef 1
⋮
}
)
⋮
}
)
You can see that the expression now contains a target with the corresponding Var, which references a
SortGroupClause
. Again, for any such variables we can
extract the corresponding column names and pass them into PandasFdw
inside a list named group_clauses
:
@@ -7,7 +7,9 @@ df = pd.DataFrame({
"parity": ["even", "odd", "even", "odd", "even", "odd", "even", "odd", "even", "odd"]
})
-def fake_remote_pandas_endpoint(columns, aggs=None):
+def fake_remote_pandas_endpoint(columns, aggs=None, group_clauses=None):
+ if group_clauses is not None:
+ return df.groupby(group_clauses, as_index=False).agg(aggs).to_dict('records')
if aggs is not None:
# Returns {"column_1": {"avg": x, "sum": y}, "column_2": {"min": z}, ..}
return df.agg(aggs).to_dict()
@@ -40,19 +42,33 @@ class PandasFdw(ForeignDataWrapper):
def can_pushdown_upperrel(self):
return {
+ "groupby_supported": True,
"agg_functions": list(_PG_TO_PANDAS_FUNC_MAP)
}
- def explain(self, quals, columns, aggs=None, verbose=False):
+ def explain(self, quals, columns, aggs=None, group_clauses=None, verbose=False):
return [
f"columns: {columns}",
- f"aggs: {json.dumps(aggs, indent=4)}"
+ f"aggs: {json.dumps(aggs, indent=4)}",
+ f"group_clauses: {group_clauses}"
]
- def execute(self, quals, columns, aggs=None):
- if aggs is not None:
+ def execute(self, quals, columns, aggs=None, group_clauses=None):
+ if group_clauses is not None:
+ pandas_aggs = _convert_aggs_arg(aggs)
+
+ for row in fake_remote_pandas_endpoint(columns, pandas_aggs, group_clauses):
+ # Convert result back to Multicorn API:
+ result = {}
+ for agg_name, agg_props in aggs.items():
+ result[agg_name] = row[(agg_props["column"], agg_props["function"])]
+
+ for group_clause in group_clauses:
+ result[group_clause] = row[(group_clause, "")]
+
+ yield result
+ elif aggs is not None:
pandas_aggs = _convert_aggs_arg(aggs)
row = fake_remote_pandas_endpoint(columns, pandas_aggs)
Again, few things worth pointing out:
groupby_supported
to True
inside can_pushdown_upperrel
At last, the resulting query will get pushed down:
sgr@localhost:splitgraph> EXPLAIN SELECT parity, count(number) FROM pandas_table
GROUP BY parity
+------------------------------------------------+
| QUERY PLAN |
|------------------------------------------------|
| Foreign Scan (cost=1.00..1.00 rows=1 width=1) |
| Multicorn: columns: ['parity', 'number'] |
| Multicorn: aggs: { |
| "count.number": { |
| "function": "count", |
| "column": "number" |
| } |
| } |
| Multicorn: group_clauses: ['parity'] |
+------------------------------------------------+
sgr@localhost:splitgraph> SELECT parity, count(number) FROM pandas_table
GROUP BY parity
+--------+-------+
| parity | count |
|--------+-------|
| even | 5 |
| odd | 5 |
+--------+-------+
The final code for PandasFdw
can be found in our Multicorn fork.
PandasFdw
Note that we don't actually handle the aggregation statements without aggregation functions correctly, i.e. ones with
only GROUP BY
clauses such as SELECT parity FROM pandas_table GROUP BY parity
. Although somewhat unusual, they are
valid SQL statements, and are analogous to SELECT DISTINCT ...
constructs. While our current implementation doesn't
support pushdown of any advanced statements with the DISTINCT
keyword, it is straightforward to fix grouping-only
query execution, simply by handling the case when aggs
is None
and group_clauses
is not.
More importantly, note that we don't do anything with the quals
argument in our toy FDW. This argument corresponds to
record filtering achieved via the WHERE
clause, and in case of non-aggregation queries this is a nice-to-have. In such
case, not pushing down quals
is still going to result in correct output, albeit in a wasteful way, given
that Postgres will double check and re-apply any unapplied WHERE
clauses.
But in case of aggregations quals
is crucial, since it determines which records are involved in the aggregation, and
therefore impacts not only performance but also the result's correctness. Our implementation does take care of this, by
demanding specific FDW implementations to provide a list of supported operators which the corresponding remote data source is
familiar with. Multicorn will look those up under the key supported_operators
in the output of can_pushdown_upperrel
,
in the absence of which there is no aggregation pushdown of queries involving WHERE
clauses.
We've demonstrated our work on extending the Multicorn FDW framework with new capabilities, namely aggregation and grouping pushdown. In particular, this was illustrated using a toy FDW model for querying Pandas dataframes via SQL.
In addition, we've provided the general background as well as some deeper insights concerning aggregations in the context of PostgreSQL FDWs. This includes the inner workings of the aggregation pushdown mechanism itself, as well as a discussion of limitations (unsupported functions and operators) and benefits (speed, lower egress costs).
In subsequent installments in this series, we will explore some real-life FDWs utilizing these features, which we have developed and currently use at Splitgraph.