This vignette explains how datapond works under the
hood. It walks through each file in the package, explaining what the
code does and why it’s designed that way.
Package Structure
datapond/
├── DESCRIPTION # Package metadata
├── NAMESPACE # Exports and imports
├── LICENSE # MIT license
├── R/
│ ├── db_connect.R # Connection management
│ ├── db_read.R # Read functions
│ ├── db_write.R # Write functions
│ ├── upsert.R # MERGE operations
│ ├── metadata.R # Snapshot/catalog queries
│ ├── discovery.R # List/exists functions
│ ├── maintenance.R # Vacuum, rollback, diff
│ ├── docs.R # Documentation & data dictionary
│ ├── preview.R # Write preview functions
│ ├── browser.R # Shiny browser launcher
│ ├── browser_ui.R # Shiny browser UI module
│ ├── browser_server.R # Shiny browser server module
│ └── zzz.R # Package load/unload hooks
└── vignettes/
├── concepts.Rmd # Conceptual background
└── code-walkthrough.Rmd # This file
Core Design Decisions
1. Singleton Connection Pattern
The package maintains a single connection stored in a private environment:
Why?
- Prevents users accidentally creating multiple connections
- Simplifies the API (no need to pass connection objects around)
- Ensures cleanup happens properly
All functions retrieve the connection via .db_get_con()
rather than creating new ones.
2. Input Validation
All user inputs are validated early with clear error messages:
.db_validate_name <- function(x, arg = "name") {
# Must be single non-empty string
# Only allows A-Z a-z 0-9 _ -
# Prevents SQL injection and path traversal
}This catches problems like table = "../../../etc/passwd"
before they cause harm.
R/db_connect.R - Connection Management
This file handles connecting to and disconnecting from DuckLake.
Private Environment and Helpers
This creates an isolated environment that persists for the R session. We store:
-
con- the DuckDB connection object -
data_path- root path for parquet files -
catalog- DuckLake catalog name -
catalog_type- “duckdb”, “sqlite”, or “postgres” -
metadata_path- path to catalog file/connection string
.db_get_con <- function() {
# Returns the connection if it exists and is valid, NULL otherwise
if (exists("con", envir = .db_env) && DBI::dbIsValid(.db_env$con)) {
return(.db_env$con)
}
NULL
}This is the safe way to get the connection - it checks validity, not just existence.
.db_get <- function(name, default = NULL) {
# Safe getter with default value
if (exists(name, envir = .db_env)) {
get(name, envir = .db_env)
} else {
default
}
}Used throughout the package to retrieve stored values.
Catalog Backend Helpers
These internal functions handle the different catalog backend types:
.db_build_ducklake_dsn <- function(catalog_type, metadata_path) {
switch(catalog_type,
duckdb = paste0("ducklake:", metadata_path),
sqlite = paste0("ducklake:sqlite:", metadata_path),
postgres = paste0("ducklake:postgres:", metadata_path),
stop("Unknown catalog_type")
)
}This builds the correct DuckLake connection string based on the
backend: - DuckDB: ducklake:metadata.ducklake - SQLite:
ducklake:sqlite:catalog.sqlite - PostgreSQL:
ducklake:postgres:dbname=... host=...
.db_load_catalog_extensions <- function(con, catalog_type) {
# Always need ducklake
try(DBI::dbExecute(con, "INSTALL ducklake"), silent = TRUE)
DBI::dbExecute(con, "LOAD ducklake")
# Load backend-specific extension
if (catalog_type == "sqlite") {
try(DBI::dbExecute(con, "INSTALL sqlite"), silent = TRUE)
DBI::dbExecute(con, "LOAD sqlite")
} else if (catalog_type == "postgres") {
try(DBI::dbExecute(con, "INSTALL postgres"), silent = TRUE)
DBI::dbExecute(con, "LOAD postgres")
}
}DuckDB needs extra extensions loaded for SQLite and PostgreSQL backends.
db_connect() - DuckLake Connection
db_connect <- function(duckdb_db = ":memory:",
catalog = "cso",
catalog_type = c("duckdb", "sqlite", "postgres"),
metadata_path = "metadata.ducklake",
data_path = "//CSO-NAS/DataLake",
snapshot_version = NULL,
snapshot_time = NULL,
...) {
catalog_type <- match.arg(catalog_type)
# Create DuckDB connection
con <- DBI::dbConnect(duckdb::duckdb(), dbdir = duckdb_db)
# Load DuckLake and backend-specific extensions
.db_load_catalog_extensions(con, catalog_type)
# Build the backend-specific connection string
ducklake_dsn <- .db_build_ducklake_dsn(catalog_type, metadata_path)
# Build ATTACH statement with options
attach_opts <- c(glue::glue("DATA_PATH {.db_sql_quote(data_path)}"))
if (!is.null(snapshot_version)) {
attach_opts <- c(attach_opts, glue::glue("SNAPSHOT_VERSION {as.integer(snapshot_version)}"))
}
attach_sql <- glue::glue(
"ATTACH {.db_sql_quote(ducklake_dsn)} AS {catalog} ({paste(attach_opts, collapse = ', ')})"
)
# Attach with helpful error messages
tryCatch({
DBI::dbExecute(con, attach_sql)
}, error = function(e) {
DBI::dbDisconnect(con, shutdown = TRUE)
hint <- switch(catalog_type,
sqlite = "Ensure the sqlite extension is available and the metadata file path is accessible.",
postgres = "Ensure PostgreSQL is running and the connection string is correct.",
duckdb = "Ensure the metadata file path is accessible."
)
stop("Failed to attach DuckLake catalog.\n", hint, "\n\nOriginal error: ", e$message)
})
DBI::dbExecute(con, glue::glue("USE {catalog}"))
# Store connection info
assign("con", con, envir = .db_env)
assign("catalog", catalog, envir = .db_env)
assign("catalog_type", catalog_type, envir = .db_env)
assign("metadata_path", metadata_path, envir = .db_env)
assign("data_path", data_path, envir = .db_env)
...
}Key points:
- Supports three catalog backends (DuckDB, SQLite, PostgreSQL)
- ATTACH statement connects the DuckLake catalog to the DuckDB session
- Error messages include backend-specific troubleshooting hints
R/db_write.R - Writing Data
db_write() - DuckLake Tables
db_write <- function(data, schema = "main", table,
mode = c("overwrite", "append"),
partition_by = NULL,
commit_author = NULL,
commit_message = NULL) {
# Register data temporarily
tmp <- .db_temp_name()
duckdb::duckdb_register(con, tmp, data)
DBI::dbExecute(con, "BEGIN")
# Set commit metadata if provided
if (!is.null(commit_author) || !is.null(commit_message)) {
DBI::dbExecute(con, glue::glue(
"CALL ducklake_set_commit_message({.db_sql_quote(catalog)}, {author}, {msg})"
))
}
# Write data
if (mode == "overwrite") {
sql <- glue::glue("CREATE OR REPLACE TABLE {qname} AS SELECT * FROM {tmp}")
} else {
sql <- glue::glue("INSERT INTO {qname} SELECT * FROM {tmp}")
}
tryCatch({
DBI::dbExecute(con, sql)
# Set partitioning if specified
if (!is.null(partition_by)) {
db_set_partitioning(schema, table, partition_by)
}
DBI::dbExecute(con, "COMMIT")
}, error = function(e) {
DBI::dbExecute(con, "ROLLBACK")
stop(e$message)
})
}Key points:
- Uses transactions (
BEGIN/COMMIT) for atomicity -
ROLLBACKon error ensures no partial changes -
ducklake_set_commit_message(catalog, author, message)records metadata in the snapshot - Partitioning is set after table creation
R/upsert.R - MERGE Operations
db_upsert() - Update + Insert
db_upsert <- function(data, schema = "main", table, by,
strict = TRUE, update_cols = NULL,
commit_author = NULL, commit_message = NULL) {
# Validate key columns exist in both data and target
missing_keys <- setdiff(by, names(data))
if (length(missing_keys) > 0) stop(...)
# Strict mode: reject duplicate keys in incoming data
if (strict) {
dup_check_sql <- glue::glue("
SELECT {by_sql}, COUNT(*) AS n
FROM {tmp}
GROUP BY {by_sql}
HAVING COUNT(*) > 1
")
dups <- DBI::dbGetQuery(con, dup_check_sql)
if (nrow(dups) > 0) stop("Duplicate keys found...")
}
# Build MERGE statement
on_sql <- paste(glue::glue("t.{by} = s.{by}"), collapse = " AND ")
# UPDATE clause depends on update_cols
update_clause <- if (is.null(update_cols)) {
"WHEN MATCHED THEN UPDATE" # DuckDB shorthand: update all
} else if (length(update_cols) == 0) {
"" # Insert-only mode
} else {
glue::glue("WHEN MATCHED THEN UPDATE SET {update_sql}")
}
merge_sql <- glue::glue("
MERGE INTO {qname} AS t
USING {tmp} AS s
ON ({on_sql})
{update_clause}
WHEN NOT MATCHED THEN INSERT ...
")
# Execute in transaction
DBI::dbExecute(con, "BEGIN")
DBI::dbExecute(con, merge_sql)
DBI::dbExecute(con, "COMMIT")
}Key points:
-
MERGE INTOis standard SQL for upsert operations -
strict = TRUEprevents accidental duplicates (a common data quality issue) -
update_cols = NULLmeans update everything;character(0)means insert-only - All wrapped in a transaction for safety
R/docs.R - Documentation & Data Dictionary
Metadata Storage
Documentation metadata is stored in a _metadata schema
within the DuckLake catalog:
.db_ensure_metadata_table <- function(con, catalog) {
DBI::dbExecute(con, "CREATE SCHEMA IF NOT EXISTS {catalog}._metadata")
DBI::dbExecute(con, "CREATE TABLE IF NOT EXISTS {catalog}._metadata.table_docs (...)")
DBI::dbExecute(con, "CREATE TABLE IF NOT EXISTS {catalog}._metadata.column_docs (...)")
}
db_describe() - Document a Table
db_describe <- function(schema = "main", table = NULL,
description = NULL, owner = NULL, tags = NULL) {
# Upsert into _metadata.table_docs
.db_ensure_metadata_table(con, catalog)
DBI::dbExecute(con, "DELETE FROM ... WHERE schema = ... AND table = ...")
DBI::dbExecute(con, "INSERT INTO ... VALUES (...)")
}
db_dictionary() - Generate Data Dictionary
db_dictionary <- function(schema = NULL, include_columns = TRUE) {
# Query information_schema and _metadata tables
tables <- DBI::dbGetQuery(con, "SELECT * FROM information_schema.tables ...")
for (tbl in tables) {
meta <- db_get_docs(schema = ..., table = ...)
cols <- DBI::dbGetQuery(con, "SELECT * FROM information_schema.columns ...")
}
do.call(rbind, rows)
}
db_search() - Find Tables
db_search <- function(pattern, field = c("all", "name", "description", "owner", "tags")) {
# Get full dictionary (without columns for speed)
dict <- db_dictionary(include_columns = FALSE)
# Filter by pattern match
matches <- switch(field,
all = grepl(pattern, dict$name) | grepl(pattern, dict$description) | ...,
name = grepl(pattern, dict$name),
description = grepl(pattern, dict$description),
...
)
dict[matches, ]
}
db_lineage() - Record Data Lineage
db_lineage <- function(schema = "main", table, sources, transformation = NULL) {
# Ensure lineage table exists
DBI::dbExecute(con, "CREATE TABLE IF NOT EXISTS {catalog}._metadata.lineage (...)")
# Upsert lineage record
sources_str <- paste(sources, collapse = "; ")
DBI::dbExecute(con, "DELETE FROM ... WHERE schema = ... AND table = ...")
DBI::dbExecute(con, "INSERT INTO ... VALUES (...)")
}Key points:
- Lineage is stored in
_metadata.lineagetable alongside documentation - Sources are joined with
;separator for storage, split on retrieval -
db_get_lineage()retrieves and parses the stored information
R/preview.R - Write Previews
db_preview_write() - Preview Before Writing
db_preview_write <- function(data, schema = "main", table,
mode = c("overwrite", "append")) {
preview <- list(
mode = mode,
table_exists = db_table_exists(schema = schema, table = table),
incoming = list(
rows = nrow(data),
cols = ncol(data),
columns = names(data)
)
)
# If target exists, compare schemas
if (preview$table_exists) {
existing_cols <- db_table_cols(schema = schema, table = table)
preview$schema_changes <- list(
new_columns = setdiff(names(data), existing_cols),
removed_columns = setdiff(existing_cols, names(data))
)
}
.print_preview(preview)
invisible(preview)
}Key points:
- Gathers all relevant information without making any changes
- Compares incoming schema with existing schema
- Returns structured data for programmatic use, prints human-readable summary
db_preview_upsert() - Preview Insert vs Update
Counts
db_preview_upsert <- function(data, schema, table, by, update_cols = NULL) {
# Register data temporarily
tmp <- .db_temp_name()
duckdb::duckdb_register(con, tmp, data)
# Count matches (updates)
match_sql <- glue::glue("
SELECT COUNT(*) FROM {tmp} s
INNER JOIN {qname} t ON {join_on_keys}
")
matches <- DBI::dbGetQuery(con, match_sql)$n
# Check for duplicate keys
dup_sql <- glue::glue("
SELECT {key_cols}, COUNT(*) FROM {tmp}
GROUP BY {key_cols} HAVING COUNT(*) > 1
")
dups <- DBI::dbGetQuery(con, dup_sql)
preview$impact <- list(
inserts = nrow(data) - matches,
updates = matches,
duplicates_in_incoming = nrow(dups)
)
.print_upsert_preview(preview)
invisible(preview)
}
R/discovery.R - Finding Data
DuckLake Discovery
db_tables <- function(schema = "main") {
sql <- glue::glue("
SELECT table_name
FROM information_schema.tables
WHERE table_catalog = {.db_sql_quote(catalog)}
AND table_schema = {.db_sql_quote(schema)}
AND table_type = 'BASE TABLE'
")
DBI::dbGetQuery(con, sql)$table_name
}Key points:
- Uses standard
information_schemaviews - Filters by catalog to avoid confusion with other attached databases
R/maintenance.R - Admin Operations
db_vacuum() - Clean Up Old Snapshots
db_vacuum <- function(older_than = "30 days", dry_run = TRUE) {
# Get snapshots before
snapshots_before <- db_snapshots()
if (dry_run) {
# Calculate what would be removed
# Show preview
return(invisible(to_remove))
}
# Actually vacuum
DBI::dbExecute(con, glue::glue(
"CALL ducklake_vacuum({.db_sql_quote(catalog)}, INTERVAL '30 days')"
))
}Key points:
-
dry_run = TRUEby default prevents accidental deletion - Shows clear before/after summary
db_rollback() - Restore Previous Version
db_rollback <- function(schema, table, version = NULL, timestamp = NULL) {
# Build time travel clause
at_clause <- glue::glue("AT (VERSION => {version})")
# Rollback = create new version with old data
rollback_sql <- glue::glue(
"CREATE OR REPLACE TABLE {qname} AS SELECT * FROM {qname} {at_clause}"
)
DBI::dbExecute(con, rollback_sql)
}Key points:
- Rollback creates a NEW snapshot with the old data (non-destructive)
- You can always “roll forward” by rolling back to a later version
db_diff() - Compare Versions
db_diff <- function(schema, table, from_version, to_version, key_cols = NULL) {
# ADDED: rows in 'to' but not in 'from'
added_sql <- glue::glue("SELECT * FROM {to_ref} EXCEPT SELECT * FROM {from_ref}")
# REMOVED: rows in 'from' but not in 'to'
removed_sql <- glue::glue("SELECT * FROM {from_ref} EXCEPT SELECT * FROM {to_ref}")
result <- list(
added = DBI::dbGetQuery(con, added_sql),
removed = DBI::dbGetQuery(con, removed_sql)
)
# If key_cols provided, find modified rows (same key, different values)
if (!is.null(key_cols)) {
# Modified = key appears in both added and removed
modified_sql <- glue::glue("
SELECT new_tbl.*
FROM ({added_sql}) AS new_tbl
INNER JOIN ({removed_sql}) AS old_tbl
ON {join_on_keys}
")
result$modified <- DBI::dbGetQuery(con, modified_sql)
}
result
}
db_compact() - Merge Small Files
db_compact <- function(schema = "main", table = NULL, max_files = NULL) {
# Build the SQL call with optional parameters
sql <- glue::glue("CALL ducklake_merge_adjacent_files({catalog}, {table}, ...)")
# Get file stats before and after for reporting
stats_before <- db_file_stats(schema, table)
DBI::dbExecute(con, sql)
stats_after <- db_file_stats(schema, table)
# Report reduction
message("Files before: ", sum(stats_before$file_count))
message("Files after: ", sum(stats_after$file_count))
}Key points:
- Uses DuckLake’s
ducklake_merge_adjacent_files()procedure -
max_filesparameter limits memory usage for large tables - Files with incompatible schema versions cannot be merged
- Run
db_cleanup_files()after to remove old files
db_file_stats() - Check File Statistics
db_file_stats <- function(schema = "main", table = NULL) {
sql <- glue::glue("FROM ducklake_table_info({.db_sql_quote(catalog)})")
info <- DBI::dbGetQuery(con, sql)
# Calculate average file size and rows per file
info$avg_file_bytes <- info$total_bytes / info$file_count
info$avg_rows_per_file <- info$total_rows / info$file_count
info
}Key points:
- Uses DuckLake’s
ducklake_table_info()function - High file count with small average size indicates compaction needed
- Useful for monitoring and maintenance scheduling
db_cleanup_files() - Remove Orphaned Files
db_cleanup_files <- function(dry_run = TRUE) {
if (dry_run) {
# Show preview of what would be cleaned
return(invisible(NA))
}
sql <- glue::glue("CALL ducklake_cleanup_old_files({.db_sql_quote(catalog)})")
DBI::dbExecute(con, sql)
}Key points:
- Orphaned files are created by
db_vacuum()anddb_compact() -
dry_run = TRUEby default for safety - Reclaims disk space by removing unreferenced Parquet files
**Key points:**
- Uses SQL `EXCEPT` for set difference operations
- With `key_cols`, can distinguish "modified" from "added/removed"
---
## `R/zzz.R` - Package Hooks
``` r
# Null coalesce operator used throughout
`%||%` <- function(x, y) if (is.null(x) || length(x) == 0) y else x
.onAttach <- function(libname, pkgname) {
packageStartupMessage(
"datapond ", utils::packageVersion("datapond"), "\n",
"Use db_connect() to connect to a DuckLake catalog."
)
}
.onUnload <- function(libpath) {
# Clean up connection when package unloads
con <- .db_get_con()
if (!is.null(con)) {
try(DBI::dbDisconnect(con, shutdown = TRUE), silent = TRUE)
}
}
Key points:
-
.onAttachruns when the package is loaded (shows helpful message) -
.onUnloadensures cleanup if the package is unloaded -
%||%handles bothNULLand zero-length values
R/browser.R, R/browser_ui.R,
R/browser_server.R - Interactive Browser
These files implement a Shiny gadget for interactively browsing the data lake.
Architecture
The browser uses the Shiny module pattern, which
allows it to be: 1. Used standalone via db_browser() 2.
Embedded in other Shiny apps via db_browser_ui() +
db_browser_server()
# Standalone usage
db_browser()
# Embedded in another app
ui <- fluidPage(
db_browser_ui("my_browser")
)
server <- function(input, output, session) {
db_browser_server("my_browser")
}
browser.R - Launcher
db_browser <- function(height = "500px", viewer = c("dialog", "browser", "pane")) {
# Check packages are available
.db_assert_browser_packages()
# Build app
ui <- db_browser_app_ui(height = height)
server <- function(input, output, session) {
db_browser_server(id = "db_browser", height = height)
}
app <- shiny::shinyApp(ui = ui, server = server)
# Run as gadget
shiny::runGadget(app, viewer = viewer_func, stopOnCancel = TRUE)
}
browser_ui.R - Module UI
The UI is built with bslib for modern Bootstrap 5
styling:
db_browser_ui <- function(id, height = "500px") {
ns <- shiny::NS(id) # Namespace for module
# Sidebar with tree view
sidebar <- bslib::sidebar(
shiny::uiOutput(ns("tree_view")),
shiny::actionButton(ns("refresh_tree"), "Refresh")
)
# Main content with tabs
main_content <- bslib::navset_card_tab(
bslib::nav_panel("Preview", ...),
bslib::nav_panel("Metadata", ...),
bslib::nav_panel("Search", ...),
bslib::nav_panel("Dictionary", ...)
)
bslib::page_sidebar(sidebar = sidebar, main_content)
}Key points:
-
NS(id)creates namespaced IDs so multiple instances don’t conflict -
bslibprovides modern Bootstrap 5 components - Tree view is rendered dynamically based on available schemas and tables
browser_server.R - Module Server
db_browser_server <- function(id, height = "500px") {
shiny::moduleServer(id, function(input, output, session) {
# Reactive values for selection state
rv <- shiny::reactiveValues(
selected_schema = NULL,
selected_table = NULL
)
# Tree view
output$tree_view <- shiny::renderUI({
.render_ducklake_tree(ns, rv)
})
# Preview - loads data on button click
preview_data <- shiny::eventReactive(input$load_preview, {
db_read(rv$selected_schema, rv$selected_table) |>
head(input$preview_rows) |>
collect()
})
output$preview_table <- DT::renderDataTable({
DT::datatable(preview_data(), filter = "top")
})
# Search
search_results <- shiny::eventReactive(input$do_search, {
db_search(input$search_pattern, field = input$search_field)
})
# Dictionary with download
output$download_dict <- shiny::downloadHandler(
filename = "data_dictionary.csv",
content = function(file) {
write.csv(db_dictionary(), file)
}
)
})
}Key points:
-
moduleServer()creates the namespaced server logic -
reactiveValuestracks UI state (selected table, etc.) -
eventReactivetriggers expensive operations only on button click -
DT::datatableprovides interactive tables with filtering
Security Considerations
SQL Injection Prevention
All user inputs that go into SQL are either:
-
Validated with
.db_validate_name()(only allowsA-Za-z0-9_-) -
Quoted with
.db_sql_quote()(escapes single quotes)
# This is safe:
table <- .db_validate_name(table) # Rejects "../../../etc"
path <- .db_sql_quote(glob_path) # Escapes quotes properlyTesting Your Understanding
Try answering these questions:
- Why do we use
.db_get_con()instead of accessing.db_env$condirectly? - What happens if
db_write()fails halfway through? - Why does
db_read()return a lazy table instead of collected data? - What’s the difference between
update_cols = NULLandupdate_cols = character(0)indb_upsert()? - Why would you choose SQLite over DuckDB as a catalog backend?
- How does
db_preview_upsert()know how many rows will be updated vs inserted? - Where is documentation metadata stored?
Contributing to the Package
When adding new functions:
-
Always validate inputs with
.db_validate_name()or custom validation - Check connection at the start of every function
-
Use
.db_get()and.db_get_con()for accessing stored state - Return invisibly for side-effect functions, visibly for queries
-
Add roxygen documentation with
@param,@return,@examples,@export - Update NAMESPACE if adding new exports
-
Add tests in
tests/testthat/for new functionality
Run devtools::document() after adding roxygen comments
to regenerate NAMESPACE and help files.