From the outset one of the key tenets of Seafowl's architecture is separation of compute and storage, true to its aim of providing fast analytical queries at the edge. On one hand, this enables running Seafowl on thin, distributed and scalable nodes or serverless platforms with scale-to-zero capabilities.
On the other hand, the data itself is kept in Parquet files in dedicated object stores, which can provide for cheap and voluminous storage capacities on the cloud. As of recently, the actual physical layout of table data accommodates the Delta Lake semantics, marking a transition from a previous custom storage layer that we used.1 In this blog post we'll be taking a look at what this means in practice, and at the same time expose some basic facts and features of the Delta protocol.
In general, table format refers to the underlying storage arrangement that represents a given table in a given database. In the case of Seafowl, it made most sense to go with flat storage of Parquet files for a variety of performance and storage benefits, a setup commonly referred to as a data lake.
Initially, we implemented a custom Seafowl data lake table format, that (among other things) involved keeping track of which Parquet file corresponds to which table (version) via a metadata catalog (i.e. a Postgres or SQLite DB). This also meant a lot of custom logic for scanning and partition pruning of tables. However, we soon realized that this niche is being commoditized quickly, and that it would be beneficial to migrate to one of the new standards.
In particular, there are three open and competing table formats that bring an ACID layer to data lakes, one of which will
likely become the de-facto industry standard eventually. Besides Delta Lake, there are also Apache Iceberg
and Apache Hudi, and while they all offer a variety of advanced features and open-source implementations,
we've found that the most mature Rust implementation is that of Delta via the delta-rs
project.
It also has an active and welcoming community of maintainers, so we finally decided to replace our custom storage layer with delta-rs instead.
Lastly, note that in conjunction with other components, namely the internal metadata catalog, and a SQL-native query engine powered by DataFusion, the combined capabilities of Seafowl make up for what is dubbed a Lakehouse.2 This is a term popularized by Databricks, but gaining industry-wide acceptance, which denotes a hybrid of a data lake and a data warehouse.
To demonstrate the new storage layer in action, let's try out a toy example, where we use Seafowl to perform analytical queries on a table that is periodically synced with an external source. First, we'll spin up a Seafowl process:
$ SEAFOWL__FRONTEND__HTTP__WRITE_ACCESS=any ./target/debug/seafowl
2023-04-11T06:58:04.654Z INFO seafowl > Starting Seafowl 0.3.2
2023-04-11T06:58:04.659Z INFO seafowl > Loading the configuration from seafowl.toml
2023-04-11T06:58:04.680Z INFO seafowl > Starting the PostgreSQL frontend on 127.0.0.1:6432
2023-04-11T06:58:04.680Z WARN seafowl > The PostgreSQL frontend doesn't have authentication or encryption and should only be used in development!
2023-04-11T06:58:04.680Z INFO seafowl > Starting the HTTP frontend on 0.0.0.0:8080
2023-04-11T06:58:04.680Z INFO seafowl > HTTP access settings: read any, write any
... and run some queries:
$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "CREATE EXTERNAL TABLE seafowl_issue_reactions
STORED AS TABLE
OPTIONS ('name' '\"splitgraph-demo/seafowl:latest\".issue_reactions')
LOCATION 'postgresql://$SPLITGRAPH_API_KEY:$SPLITGRAPH_API_SECRET@data.splitgraph.com:5432/ddn';
CREATE TABLE reactions AS
SELECT content, issue_number, 2022 as year
FROM staging.seafowl_issue_reactions
WHERE date_part('year', created_at) = 2022;
SELECT content, count(*) as count
FROM reactions
GROUP BY content
ORDER BY count DESC"}
EOF
{"content":"+1","count":5}
{"content":"heart","count":3}
{"content":"hooray","count":1}
The above uses Seafowl's POST endpoint, which isn't cache-friendly, but can run multiple write statements sequentially, and an optional read statement at the end. The queries we run:
Turning our attention over to storage now, which in the default case is our local file system, we can see the layout of files that make up this simple table:
$ ls -l seafowl-data/
total 480
drwxr-xr-x 4 markogrujic staff 128 Apr 11 08:58 00c853f9-48b6-4531-881a-f5e1f546ed0e
-rw-r--r-- 1 markogrujic staff 4096 Apr 11 08:58 seafowl.sqlite
$ cd seafowl-data/00c853f9-48b6-4531-881a-f5e1f546ed0e/
$ tree .
.
├── _delta_log
│ ├── 00000000000000000000.json
│ └── 00000000000000000001.json
└── part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet
1 directory, 3 files
In Seafowl, each table gets a corresponding UUID, which also serves as the name of the table's root directory, where data
files and logs are stored. The root Seafowl data folder (seafowl-data
) contains all such table directories (besides
seafowl.sqlite
, which is the default Seafowl catalog), regardless of the schema/database they belong to. This ensures
that any schema or table drops are catalog metadata operations only, and that the actual garbage collection of dropped table
folders/files can be done lazily later on.3
Since the remote table that we created first (seafowl_issue_reactions
) is an in-memory entity, we only have one
physical table (reactions
) in storage (00c853f9-48b6-4531-881a-f5e1f546ed0e
directory). This table has only one data
file for now (the sole .parquet
file), as we only inserted a few rows. In the general case it can consist of an arbitrary
number of files, dictated by the total input size and the partition chunking configuration.4
As per the Delta protocol, the log entries represent the transactional history of changes to a table. The elementary unit are JSON files, each corresponding to a particular table version, and named by incrementally increasing numbers starting from 0. These are used to replay all Delta actions taken on a table in order to reproduce the desired table version. This effectively means knowing which files belong to the table state for a given version, also known as a table snapshot, which is very useful during query planning.
In the case of our reactions
table, there are two initial log files stemming from the CREATE TABLE reactions AS ...
statement.
The first one of these represents the zeroth version of the table, and corresponds to the creation of a blank table along with some metadata (time, name, schema, etc.):
$ jq --sort-keys . _delta_log/00000000000000000000.json
{
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 1
}
}
{
"metaData": {
"configuration": {},
"createdTime": 1681196324144,
"description": "Created by Seafowl version 0.3.2",
"format": {
"options": {},
"provider": "parquet"
},
"id": "48659c21-0ecf-4e3c-9168-5e056c6d813d",
"name": "reactions",
"partitionColumns": [],
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"content\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"issue_number\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"year\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}"
}
}
{
"commitInfo": {
"clientVersion": "delta-rs.0.8.0",
"operation": "CREATE TABLE",
"operationParameters": {
"location": "file:///Users/markogrujic/Splitgraph/seafowl-data/00c853f9-48b6-4531-881a-f5e1f546ed0e/",
"metadata": "{\"configuration\":{},\"created_time\":1681196324144,\"description\":\"Created by Seafowl version 0.3.2\",\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"48659c21-0ecf-4e3c-9168-5e056c6d813d\",\"name\":\"reactions\",\"partition_columns\":[],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"content\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"issue_number\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"year\",\"nullable\":false,\"type\":\"long\"}],\"type\":\"struct\"}}",
"mode": "ErrorIfExists",
"protocol": "{\"minReaderVersion\":1,\"minWriterVersion\":1}"
},
"timestamp": 1681196324145
}
}
The second log entry (version 1) represents the writing of data to the table, and contains a single add
Delta action.
This action modifies the table state by logically attaching the data file to the new version.
$ jq --sort-keys . _delta_log/00000000000000000001.json
{
"add": {
"dataChange": true,
"modificationTime": 1681196324172,
"partitionValues": {},
"path": "part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet",
"size": 1336,
"stats": "{\"numRecords\":9,\"minValues\":{\"content\":\"+1\",\"issue_number\":57,\"year\":2022},\"maxValues\":{\"year\":2022,\"issue_number\":188,\"content\":\"hooray\"},\"nullCount\":{\"issue_number\":0,\"content\":0,\"year\":0}}",
"tags": null
}
}
{
"commitInfo": {
"clientVersion": "delta-rs.0.8.0",
"operation": "WRITE",
"operationParameters": {
"mode": "Append"
},
"timestamp": 1681196324174
}
}
Note that while here we break up our CREATE TABLE reactions AS ...
statement into two distinct commits (~ CREATE
+ INSERT
),
this isn't strictly enforced by the Delta protocol (and may be changed in later Seafowl versions). In fact,
many actions can be mixed together when committing a Delta transaction, including creating a new table and attaching
some files to it simultaneously.
What the protocol does enforce however, is that the physical files must first be written to the storage, and only in the next
phase can a commit referencing these files be attempted. Since the object store abstraction doesn't have locking or transaction
primitives, the delta-rs
crate provides for the ability to use implementation specific utilities for achieving the required
atomicity. For instance in case of AWS S3 object store, there is the option of using DynamoDB as a lock to help with mediating
concurrent writes.
DML
statements and partition pruningEach individual Seafowl write statement (INSERT
, UPDATE
or DELETE
) gets executed as a single Delta transaction
resulting in a new table version. The INSERT
statement is append-only, meaning that all new data is add
-ed
to the new table version. For instance, if we want to sync our Seafowl GitHub reactions table with fresh 2023 data we can
run:
$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "INSERT INTO reactions
SELECT content, issue_number, 2023 as year
FROM staging.seafowl_issue_reactions
WHERE date_part('year', created_at) = 2023;
SELECT content, count(*) as count
FROM reactions
GROUP BY content
ORDER BY count DESC"}
EOF
{"content":"+1","count":9}
{"content":"heart","count":5}
{"content":"hooray","count":2}
This results in the creation of an additional data file and a new table version (2):
$ tree .
.
├── _delta_log
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ └── 00000000000000000002.json
├── part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet
└── part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet
1 directory, 5 files
$ jq --sort-keys . _delta_log/00000000000000000002.json
{
"add": {
"path": "part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet",
"stats": "{\"numRecords\":7,\"minValues\":{\"issue_number\":137,\"year\":2023,\"content\":\"+1\"},\"maxValues\":{\"content\":\"hooray\",\"issue_number\":349,\"year\":2023},\"nullCount\":{\"issue_number\":0,\"content\":0,\"year\":0}}",
...
}
}
{
"commitInfo": {
"operation": "WRITE",
...
}
Note that as per the Delta logs stats, the two data files are nicely partitioned by the year, reflecting the order of data insertion.
This is very handy, as delta-rs
will make sure to optimize the query planning procedure, by scoping the necessary table scans
only to relevant data files, via the process known as partition pruning. We
can see this clearly when we EXPLAIN
a query that uses a filter clause with the year
column in it:
$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "EXPLAIN SELECT content
FROM reactions
WHERE year = 2023"}
EOF
{"plan":"Projection: reactions.content\n Filter: reactions.year = Int64(2023)\n TableScan: reactions projection=[content, year], partial_filters=[reactions.year = Int64(2023)]","plan_type":"logical_plan"}
{"plan":"ProjectionExec: ... partitions={1 group: [[part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet]]}, predicate=year = Int64(2023), pruning_predicate=year_min@0 <= 2023 AND 2023 <= year_max@1, projection=[content, year]\n","plan_type":"physical_plan"}
Note that only a single partition (the one from version 2) is referenced in the table scan (the partitions
field in the physical plan),
since the other data file (from version 1) does not contain any records where the year
is 2023.
In contrast to INSERT
, the DELETE
involves another type of Delta action, named remove
, that removes
the logical connection between a data file and the new version, though it does not remove the data file physically.
The UPDATE
statement can contain the mixture of both, as some assignments are made that replace some old values.
To take a slightly contrived example, assume that for instance we want to replace the content in our reactions
table
with actual emojis that represent them (note that we also record the timestamp prior to that for later use):
$ VERSION_2_TIMESTAMP=$(date -uIseconds)
$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "UPDATE reactions
SET content = CASE
WHEN content = 'hooray' THEN '🎉'
WHEN content = 'heart' THEN '❤️'
WHEN content = '+1' THEN '👍'
END
WHERE year = 2023;
SELECT year, content, count(*) as count
FROM reactions
GROUP BY year, content
ORDER BY count DESC"}
EOF
{"content":"+1","count":5,"year":2022}
{"content":"👍","count":4,"year":2023}
{"content":"heart","count":3,"year":2022}
{"content":"❤️","count":2,"year":2023}
{"content":"hooray","count":1,"year":2022}
{"content":"🎉","count":1,"year":2023}
This leads to table version 3 with a new data file:
$ tree .
.
├── _delta_log
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ ├── 00000000000000000002.json
│ └── 00000000000000000003.json
├── part-00000-3d16e87d-04c4-4f6c-9c6a-8769891acbdb-c000.snappy.parquet
├── part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet
└── part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet
1 directory, 7 files
The corresponding log symbolizes the inclusion of the new file to the present table state, as well the removal of the file prior to it:
$ jq --sort-keys . _delta_log/00000000000000000003.json
{
"add": {
"dataChange": true,
"modificationTime": 1681196616085,
"partitionValues": {},
"path": "part-00000-3d16e87d-04c4-4f6c-9c6a-8769891acbdb-c000.snappy.parquet",
"size": 1347,
"stats": "{\"numRecords\":7,\"minValues\":{\"content\":\"❤️\",\"issue_number\":137,\"year\":2023},\"maxValues\":{\"content\":\"👍\",\"issue_number\":349,\"year\":2023},\"nullCount\":{\"content\":0,\"issue_number\":0,\"year\":0}}",
"tags": null
}
}
{
"remove": {
"dataChange": true,
"deletionTimestamp": 1681196616085,
"extendedFileMetadata": true,
"partitionValues": {},
"path": "part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet",
"size": 1338,
"tags": null
}
}
{
"commitInfo": {
"clientVersion": "delta-rs.0.8.0",
"timestamp": 1681196616085
}
}
This again has to do with partition pruning, but this time in the context of writes. Namely, since the new version overwrites
the rows with year
2023, the prior partition that contained those rows is now obsolete, which is why it is being removed
from the state. These remove
actions are referred to as tombstones in the Delta protocol lingo.
Note that the version 3 log doesn't mention anything related to the initial partition, stemming from version 1, meaning that
the original add
action is still valid. This is because that partition contains the data where year
is 2022, so nothing
was changed there by our UPDATE
statement.
One more thing worth mentioning is that, besides the regular log entries in the form of versioned JSON files, the Delta protocol prescribes checkpoints as a way to short-circuit the re-play of log actions. These are by default created after every 10 versions, and represent an actual Parquet file that the subsequent log entries should be built upon.
Finally, note that we can recreate snapshots of earlier table version now, given that both the logs and data files are still there, which facilitates time travel querying. For instance, we can uses this to calculate a SQL diff between the current and the previous table version:
$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "WITH reactions_diff AS (
SELECT *, 'added' as action FROM (
SELECT * FROM reactions
EXCEPT ALL
SELECT * FROM reactions('$VERSION_2_TIMESTAMP')
)
UNION ALL
SELECT *, 'removed' as action FROM (
SELECT * FROM reactions('$VERSION_2_TIMESTAMP')
EXCEPT ALL
SELECT * FROM reactions
)
)
SELECT action, content, count(*) as count
FROM reactions_diff
GROUP BY action, content
ORDER BY count DESC"}
EOF
{"action":"removed","content":"+1","count":4}
{"action":"added","content":"👍","count":4}
{"action":"added","content":"❤️","count":2}
{"action":"removed","content":"heart","count":2}
{"action":"added","content":"🎉","count":1}
{"action":"removed","content":"hooray","count":1}
However, once we run VACUUM TABLE reactions
command, all tombstones will be deleted, so the time travel to the previous
version will not work anymore.
We've only scratched the surface of the Delta protocol capabilities, and are looking forward to further developments and
integration with the delta-rs
project. One such example would be adding support for partition columns in Seafowl, which
would be a natural thing for our example reactions
table.
In particular this would mean writing nested data files in separate sub-folders, named after the value of each partition
column, i.e. year=2022/part-00000-8f6...
and year=2023/part-00000-3d1...
. This would enable a more optimal partition
pruning strategy. Namely, using the log stats for scoping the relevant partitions as done above results in inexact
partition pruning. This means that the source doesn't guarantee that all returned rows pass the provided filter. In turn
DataFusion will have to keep the filtering node and do a pass over all the rows to ensure that they abide by the clause used.
With partition columns pruning is exact, so that DataFusion will be relieved of the additional work.
In the long term, what gets us most excited is the prospect of integration with projects like Nessie and lakeFS, which provide a data version control layer for table formats such as Delta Lake. This would endow Seafowl with Git-for-data semantics, and thus open a whole new world of possible applications.
DROP
and VACUUM
support to facilitate migration.↩VACUUM DATABASE
command.↩max_partition_size
config parameter.↩