Note (April 2023): In the meantime we've migrated Seafowl's storage layer to use the Delta Lake protocol. While the high-level functionality and feature-set remain the same, some implementation details in this blog post are no longer up to date.
The primary specialty of Seafowl is data-crunching over large slowly-changing data sets at the edge. It does so by leveraging the standard HTTP cache semantics natively in the provided query execution endpoint(s).
The query result then becomes a regular static HTTP asset, eligible for caching at various locations (from the browser to the CDN), thus saving on the compute time. Seafowl also utilizes the usual HTTP cache revalidation mechanism, so that when the underlying data does change, the query is re-run in a performant manner (thanks to Rust and Apache DataFusion). In a sense, it can be said that Seafowl is a fast, lightweight and self-contained version of the Splitgraph DDN
Despite being an analytical query engine at it's core, Seafowl (like the DDN) also supports writing data using DDL
(CREATE
, ALTER
and DROP
) as well as DML (INSERT
, UPDATE
and DELETE
) SQL statements. In this blog post we'll
take a look at the interplay between such statements and the way Seafowl chunks and stores the data in partition objects.
We'll also describe how table versioning and query time travel concepts naturally arise in this context.
To start off, let's create a new table with some dummy data after spinning up a Seafowl instance. There are a number of options for getting data into Seafowl, such as pointing a table to a source table from a remote database, or by pointing a table to a local or remote CSV/Parquet file.
For this example though let's use the upload endpoint. It is intended for when you can't or don't want to place your data file on the same machine where Seafowl is running, nor spin up a one-off server for serving it over HTTP:
$ $ cat > some_numbers.csv << EOF
> name,number
> one,1
> two,2
> EOF
$ curl \
-H "Authorization: Bearer write_password" \
-F "data=@some_numbers.csv" \
http://localhost:8080/upload/some/numbers
done
Even though the primary speciality of Seafowl is querying over HTTP, for the sake of conciseness we'll run the rest of the
queries in this blog post via a client. Specifically, we'll exploit the fact that Seafowl can also expose a PostgreSQL-compatible endpoint
and use psql
:
seafowl=> SELECT * FROM some.numbers;
name | number
------+--------
one | 1
two | 2
(2 rows)
seafowl=> SELECT column_name, data_type FROM information_schema.columns
seafowl-> WHERE table_name = 'numbers';
column_name | data_type
-------------+-----------
name | Utf8
number | Int64
(2 rows)
No surprises there; the data was uploaded into a table and the schema got introspected since we haven't specified one1. Since we've just created it, our table so far only has a single version and a single partition, which in Seafowl are stored as Parquet files. To verify this we can utilize a couple of system tables that Seafowl provides, providing insights into the table metadata:
seafowl=> SELECT * FROM system.table_versions;
table_schema | table_name | table_version_id | creation_time
--------------+------------+------------------+---------------------
some | numbers | 1 | 2022-11-18 13:11:24
(1 row)
seafowl=> SELECT table_version_id, table_partition_id, object_storage_id, row_count FROM system.table_partitions;
table_version_id | table_partition_id | object_storage_id | row_count
------------------+--------------------+--------------------------------------------------------------------------+-----------
1 | 1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet | 2
(1 row)
Given that we haven't specified a custom object store, Seafowl defaults to using the local file system to store the objects in the seafowl-data
folder:
$ ls -l seafowl-data/ | grep .parquet
-rw------- 1 ubuntu ubuntu 854 Nov 18 13:11 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet
Our table consists of only one partition file since our example data is tiny; had the uploaded file been sufficiently large2, Seafowl would have partitioned it into multiple Parquet files.
INSERT
commandWhen it comes to adding new data via an INSERT
statement Seafowl will create new partition(s) from the input, and link-up
with existing partitions of the referenced table to make a new version of it. Hence, each separate invocation generates at least one
partition3:
seafowl=> INSERT INTO some.numbers VALUES ('three', 3), ('four', 4), ('five', 5);
...
seafowl=> INSERT INTO some.numbers VALUES ('one', 1), ('two', 2);
seafowl=> SELECT * FROM some.numbers;
name | number
-------+--------
one | 1
two | 2
one | 1
two | 2
three | 3
four | 4
five | 5
(7 rows)
The combined effect of the preceding two INSERT
statements is the creation of two new table versions (2 and 3). Each one
inherits all the partitions from the previous version:
seafowl=> SELECT * FROM system.table_versions;
table_schema | table_name | table_version_id | creation_time
--------------+------------+------------------+---------------------
some | numbers | 1 | 2022-11-18 13:11:24
some | numbers | 2 | 2022-11-18 13:12:58
some | numbers | 3 | 2022-11-18 13:13:35
(3 rows)
seafowl=> SELECT table_version_id, table_partition_id, object_storage_id, row_count FROM system.table_partitions;
table_version_id | table_partition_id | object_storage_id | row_count
------------------+--------------------+--------------------------------------------------------------------------+-----------
1 | 1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet | 2
2 | 1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet | 2
2 | 2 | b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet | 3
3 | 1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet | 2
3 | 2 | b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet | 3
3 | 3 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet | 2
(6 rows)
However, note that the last INSERT
added the same data that was used to create the table initially via the upload endpoint.
Therefore, even though our table now has three logical partitions, two of them (table_partition_id
1 and 3) are
physically identical, pointing to the same file:
$ ls -l seafowl-data/ | grep .parquet
-rw------- 1 ubuntu ubuntu 854 Nov 18 13:13 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet
-rw------- 1 ubuntu ubuntu 883 Nov 18 13:12 b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet
This example illustrates how this partitioning scheme naturally lends itself to supporting time travel querying that we'll discuss shortly. In particular, what makes this proposition very appealing is that by storing the objects for the latest/current table version, we're also able to re-construct prior table versions with minimal additional storage overhead due to partition re-use. In this particular example three different table versions can be obtained from just two objects.
UPDATE
and DELETE
commandUnlike INSERT
, the two remaining DML statements will in general replace some of the existing partitions with new ones.
The specifics depend on the exact qualifier used in the WHERE
clause, as well as the statement type:
seafowl=> DELETE FROM some.numbers WHERE number = 3 OR number = 5;
...
seafowl=> UPDATE some.numbers SET name = 'two', number = 2 WHERE number = 1;
seafowl=> SELECT * FROM some.numbers;
name | number
------+--------
four | 4
two | 2
two | 2
two | 2
two | 2
(5 rows)
seafowl=> SELECT * FROM system.table_versions where table_version_id > 3;
table_schema | table_name | table_version_id | creation_time
--------------+------------+------------------+---------------------
some | numbers | 4 | 2022-11-18 13:15:18
some | numbers | 5 | 2022-11-18 13:15:53
(2 rows)
Here, the DELETE
statement specifically targets the 2nd partition (created by the first INSERT
statement), while leaving the
other two partitions (which are actually identical) unchanged and eligible for re-use. The targeted partition is processed, and the rows matching
the qualifier selection are filtered out, thus leading to a new, in this case smaller, partition (or no partition at all if all rows are filtered out).
When there are multiple partitions that match the selection, for the sake of optimal partition size, we bundle them together, and then perform
required row filtration followed by potential re-chunking of partitions.
The UPDATE
on the other hand leaves the total number of rows unchanged, but is also subject to merging of partitions
that need to be processed. In the above case, UPDATE
targets the two "twin" partitions, changes two out of four rows,
and merges the result in one final partition:
seafowl=> SELECT table_version_id, table_partition_id, object_storage_id, row_count
seafowl-> FROM system.table_partitions WHERE table_version_id > 3;
table_version_id | table_partition_id | object_storage_id | row_count
------------------+--------------------+--------------------------------------------------------------------------+-----------
4 | 1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet | 2
4 | 3 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet | 2
4 | 4 | 78ae77c39b4eac1a80d7238af473af82a8038b41aebb908c890dd78323a48d65.parquet | 1
5 | 4 | 78ae77c39b4eac1a80d7238af473af82a8038b41aebb908c890dd78323a48d65.parquet | 1
5 | 5 | da56d46a18b13f3d67c2816f107a837d41c760c8d67afcc015c334375f5a5e87.parquet | 4
(5 rows)
As a final result we now have a total of four partition objects:
$ ls -l seafowl-data/ | grep .parquet
-rw------- 1 ubuntu ubuntu 854 Nov 18 13:13 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet
-rw------- 1 ubuntu ubuntu 844 Nov 18 13:15 78ae77c39b4eac1a80d7238af473af82a8038b41aebb908c890dd78323a48d65.parquet
-rw------- 1 ubuntu ubuntu 883 Nov 18 13:12 b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet
-rw------- 1 ubuntu ubuntu 835 Nov 18 13:15 da56d46a18b13f3d67c2816f107a837d41c760c8d67afcc015c334375f5a5e87.parquet
This partition-scoping mechanism for UPDATE
/DELETE
ultimately builds on top of partition pruning
that Seafowl uses for reducing the number of objects needed for scanning in response to usual SELECT
queries with filtration. In turn, partition pruning
relies on simple statistics gathered during partition generation, such as max/min value for each column,
as well as the presence/count of null values. Consequently, if a particular column can't/doesn't have such statistics collected
the partition pruning doesn't work, and all table partitions need to be processed.
Having demonstrated how older table versions can be re-constructed from the constituent objects, let's discuss how that looks in practice. First, a bit of background on the way table resolution works in general.
Each query invocation is accompanied by a fresh Seafowl context, wrapping the inner DataFusion context itself. The wrapped context in turn contains the nested maps of all databases (also referred to as catalogs) -> schemas -> tables, so that it can always determine which particular table implementations are needed for executing the given query. By default all latest table versions at the moment of query invocation are populated in the nested maps.
To support time travel queries, we opted for (ab)using the table function syntax4, whereby the target table reference
can have a timestamp value specified inside parenthesis. For instance to go back and inspect the initial table contents
or the contents after the two INSERT
's (versions 1 and 3, respectively) we can use:
seafowl=> SELECT * FROM some.numbers('2022-11-18 13:13:35');
name | number
-------+--------
one | 1
two | 2
three | 3
four | 4
five | 5
one | 1
two | 2
(7 rows)
seafowl=> SELECT * FROM some.numbers('2022-11-18 13:11:24');
name | number
------+--------
one | 1
two | 2
(2 rows)
This works by Seafowl doing an initial walk over the query AST during the logical planning phase, and recording the presence (and value) of temporal versions used in the query. If none are found, then the query proceeds as usual, with the default context table map unchanged.
On the other hand, if Seafowl detects that time travel querying is used for some table(s) it then performs triage for the exact table_version_id
required. Once it has that information it will rewrite the affected table names in the AST to a unique one using the specific table_version_id
fetched.
Finally, Seafowl updates the context table mapping for each table version specified with the unique name that was used in the AST rewrite.
Note that this only involves fetching the table metadata, such as which particular partitions correspond to a given table_version_id
.
Loading of the partitions themselves is only done during the scan phase, once all the optimizations have been performed,
and we have the table filters handy for partition pruning.
There are a couple of limitations in Seafowl time travel implementation that we aim to improve down the line. One such example
is lack of support for using time travel in write queries, as this mechanism is only supported for SELECT
queries at present.
Another would be adding more flexibility to our existing time travel syntax, for instance by adding relative time
(e.g. table version from exactly 2 days ago) and version specification support (e.g. table version 2 versions prior to the latest one).
Alternatively we may also eventually align it with the AS OF
syntax standard that is gaining more traction in the industry.
Nonetheless, the present mechanism is quite robust and versatile, capable of supporting advanced queries, such as CTEs and table JOINs that use multiple different versions of the same table at once.
max_partition_size
config parameter.↩CREATE TABLE AS
statement.↩