Datasets
Dataset-level task handlers — generating, importing, updating, deleting, duplicating, and cleaning up datasets and their attachments.
Overview
The worker supports seven dataset-level task types. Unlike ai_generate_records and ai_generate_fields, which operate on the contents and schema of an existing dataset, these tasks create, modify, or manage datasets themselves.
| Task Type | Description |
|---|---|
ai_generate_datasets | Generate new datasets or enrich existing ones using AI |
import_dataset | Create a dataset from an uploaded CSV, JSON, or NDJSON file |
batch_update_datasets | Update name, description, tags, or options on multiple datasets |
batch_delete_datasets | Soft-delete or permanently delete multiple datasets |
duplicate_dataset | Copy a dataset's metadata and optionally all its records |
cleanup_stale_attachments | Delete pending attachment files and DB rows older than a TTL |
cleanup_dataset_attachments | Delete all attachment files for a deleted dataset from storage |
Database Tables
datasets Table
The primary table for dataset metadata. Dataset-level handlers read from and write to this table. The fields column holds the complete field schema as a JSONB array.
| Column | Type | Description |
|---|---|---|
id | varchar (UUID) | Primary key, auto-generated |
name | varchar(255) | Dataset display name |
description | text (nullable) | Human-readable description |
tags | jsonb | Array of string tags |
fields | jsonb | Array of FieldDefinition objects — see the Fields page |
options | jsonb | Freeform metadata. Workers read options->>'locked' and options->>'public'. Handlers may write a generated sub-object to record what was AI-generated. |
deleted_at | timestamp (nullable) | Soft-delete timestamp. Workers must check deleted_at IS NULL before operating on a dataset. |
created_by | varchar | FK to users.id — dataset owner |
created_at | timestamp | Auto-set on insert |
updated_at | timestamp | Updated on every change |
updated_by | varchar | FK to users.id — worker sets this on every write |
dataset_shares Table
Controls who has access to a dataset. When any handler creates a new dataset, it must immediately insert an owner share for task.created_by or the user will not be able to see it.
| Column | Type | Description |
|---|---|---|
dataset_id | varchar | FK to datasets.id |
user_id | varchar | FK to users.id |
role | varchar | Always 'owner' when created by a worker |
pending_attachments Table
Tracks file uploads that are waiting to be associated with a record. Rows are written by the API server when a user uploads a file but hasn't yet saved the record. The cleanup_stale_attachments handler removes rows (and their storage files) that have been waiting too long.
| Column | Type | Description |
|---|---|---|
id | varchar (UUID) | Primary key |
storage_key | varchar | Object storage key used to delete the file |
created_at | timestamp | When the upload was staged |
ai_generate_datasets
Generates new dataset schemas via AI (generate mode) or updates the metadata of existing datasets (enrich mode). Both modes use a structured prompt and return AI-generated field definitions post-processed by restructureAIFields — assigning fld_ IDs and nesting options into a sub-object.
| Property | Value |
|---|---|
| Task Type | ai_generate_datasets |
| Tables Used | tasks, task_events, datasets (insert or update), dataset_shares (insert for generate) |
Input Schema
{
"mode": "generate",
"count": 3,
"ai": {
"prompt": "Generate SaaS product datasets with pricing and feature data",
"model": "gpt-4o",
"temperature": 0.7,
"max_output_tokens": 16000
}
}
{
"mode": "enrich",
"datasetIds": ["ds_abc123", "ds_def456"],
"ai": {
"prompt": "Add tags and improve descriptions based on the existing fields",
"model": "gpt-4o",
"temperature": 0.5
}
}
| Field | Type | Description |
|---|---|---|
mode | string | "generate" or "enrich". Defaults to "generate" if omitted. |
count | number | Number of datasets to generate (generate mode only). Defaults to 1. |
datasetIds | string[] | IDs of existing datasets to enrich (enrich mode only). Required for enrich mode. |
ai.prompt | string? | User instruction. In generate mode, describes what datasets to create. In enrich mode, describes what to update. Falls back to auto-detected missing fields if omitted. |
ai.model | string | Requested model (may be "auto"). Use task.metadata.resolvedModel as the authoritative value. |
ai.temperature | number? | Sampling temperature (0–2). Defaults to 0.7. |
ai.max_output_tokens | number? | Token limit for the AI call. Defaults to 16000. |
Generate Mode
A single AI call produces all count datasets at once using structured output (GENERATE_BASED_SCHEMA). Each dataset is then inserted into datasets and an owner share is created in dataset_shares. If an individual insert fails, that dataset is recorded with an error but the rest continue (partial success within generate mode).
Structured output: OpenAI provider uses jsonSchema structured output for guaranteed-valid JSON. Anthropic and Gemini use jsonMode. A JSON repair function handles any malformed responses.
Enrich Mode
Each dataset in datasetIds is enriched with a separate AI call (ENRICH_DATASET_SCHEMA). The current dataset state (name, description, tags, field names, and up to 5 sample records) is passed to the model. Only non-null fields returned by the model are applied. Cancellation is checked between datasets.
Output Schema
{
"datasetIds": ["ds_xyz001", "ds_xyz002"],
"datasets": [
{
"id": "ds_xyz001",
"name": "SaaS Pricing Data",
"description": "Pricing plans for SaaS companies",
"tags": ["saas", "pricing"],
"fields": [...],
"tokenUsage": { "input_tokens": 1200, "output_tokens": 900, "total_tokens": 2100 }
}
],
"tokenUsage": { "input_tokens": 1200, "output_tokens": 900, "total_tokens": 2100 }
}
| Field | Description |
|---|---|
datasetIds | IDs of successfully created or updated datasets |
datasets | Per-dataset result objects. Failed inserts include an error sub-object instead of an id. |
tokenUsage | Accumulated token usage across all AI calls |
import_dataset
Downloads a file from object storage, parses it, creates a new dataset, and batch-inserts the rows as records. The file is always deleted from storage at the end, whether the import succeeds or fails.
| Property | Value |
|---|---|
| Task Type | import_dataset |
| Tables Used | tasks, task_events, datasets (insert), dataset_shares (insert), dataset_records (insert) |
| Row Limit | 10,000 rows maximum |
Input Schema
{
"fileKey": "imports/user_abc/companies.csv",
"name": "My Companies",
"description": "Imported from CSV",
"tags": ["import", "companies"],
"options": {},
"fields": [
{ "id": "fld_a1b2c3d4e5f6", "name": "Company", "type": "text", "options": { "required": true } },
{ "id": "fld_b2c3d4e5f6g7", "name": "Revenue", "type": "number" }
]
}
| Field | Type | Description |
|---|---|---|
fileKey | string | Storage key of the uploaded file. Required. |
name | string? | Dataset name. Defaults to "Untitled Dataset". |
description | string? | Dataset description. Optional. |
tags | string[]? | Tags for the dataset. Defaults to []. |
options | object? | Freeform options stored on the dataset. Defaults to {}. |
fields | FieldDefinition[]? | Explicit field schema. If omitted, fields are inferred from the file contents using type detection. |
Supported Formats
Format is detected from the file extension:
| Extension | Format | Notes |
|---|---|---|
.csv | CSV | First row is treated as headers. All values are strings — type coercion is applied during validation. |
.json | JSON | Must be a top-level array of objects. |
.ndjson, .jsonl | Newline-delimited JSON | One JSON object per line. Empty lines are skipped. |
Type Inference
When fields is not provided, the worker samples all values in each column and infers the most specific matching type. The inference priority is: boolean → integer → number → email → url → date → array → object → text. Inferred attachment type results are filtered out defensively.
Type Coercion
During validation, values from CSV files (which are always strings) are coerced to their field type. Numbers strip currency symbols. Booleans accept "true"/"false"/"1"/"0"/"yes"/"no". Dates accept ISO 8601, common locale formats, and Unix timestamps. Values that cannot be coerced are set to null and recorded as row-level errors.
Attachment fields: If the provided field schema includes attachment-type fields, those fields are preserved in the dataset schema but no data is written for them on any row — attachment file references cannot be imported from flat files.
Partial Success
Row-level validation errors do not abort the import. All valid rows are inserted; invalid rows are collected in output.errors. The task completes successfully as long as at least one row was inserted. If zero rows are valid, the task fails.
Output Schema
{
"dataset": {
"id": "ds_abc123",
"name": "My Companies",
"description": "Imported from CSV",
"tags": ["import", "companies"],
"fields": [...]
},
"detectedFormat": "csv",
"fieldsSource": "inferred",
"fields": [...],
"summary": {
"totalRows": 850,
"imported": 847,
"failed": 3
},
"errors": [
{ "index": 5, "error": "row 5: Revenue — expected number, got 'N/A'" },
{ "index": 23, "error": "row 23: Email — expected valid email address, got 'not-an-email'" }
]
}
| Field | Description |
|---|---|
dataset | Metadata for the newly created dataset: id, name, description, tags, and fields |
detectedFormat | File format detected from the extension: "csv", "json", "ndjson", or "jsonl" |
fieldsSource | "provided" if fields were passed in the input, "inferred" if they were auto-detected from the file |
fields | The full field definition array used for the import |
summary.totalRows | Total rows found in the file |
summary.imported | Rows successfully inserted |
summary.failed | Rows that failed validation or insertion |
errors | Array of { index, error } objects for each failed row. Omitted when there are no errors. |
batch_update_datasets
Updates name, description, tags, and/or options on multiple datasets sequentially. The options field is merged with the existing value (shallow merge) rather than replaced wholesale. Cancellation is checked between items.
| Property | Value |
|---|---|
| Task Type | batch_update_datasets |
| Tables Used | tasks, task_events, datasets (update) |
Input Schema
{
"datasetUpdates": [
{
"id": "ds_abc123",
"name": "Renamed Dataset",
"description": "Updated description",
"tags": ["updated", "q1"],
"options": { "locked": true }
},
{
"id": "ds_def456",
"tags": ["archived"]
}
]
}
| Field | Type | Description |
|---|---|---|
datasetUpdates | object[] | Array of update objects. Each must have an id. All other fields are optional — only provided fields are updated. |
id | string | ID of the dataset to update. Required per item. |
name | string? | New dataset name. |
description | string? | New dataset description. |
tags | string[]? | Replacement tag array. |
options | object? | Partial options update. Merged with existing options using { ...existing, ...update.options }. |
Partial Success
If individual updates fail (e.g., dataset not found), they are recorded in output.errors and the rest continue. The task completes successfully if at least one update succeeded. If all updates fail, the task is marked as failed.
Output Schema
{
"updatedIds": ["ds_abc123"],
"updated": 1,
"errors": [
{ "id": "ds_def456", "error": "Dataset \"ds_def456\" not found" }
]
}
batch_delete_datasets
Deletes multiple datasets either as a soft-delete (sets deleted_at) or a permanent delete (hard DELETE that cascades to records, shares, and invitations). Cancellation is checked between items.
| Property | Value |
|---|---|
| Task Type | batch_delete_datasets |
| Tables Used | tasks, task_events, datasets (update or delete) |
Input Schema
{
"datasetIds": ["ds_abc123", "ds_def456", "ds_ghi789"],
"permanent": false
}
| Field | Type | Description |
|---|---|---|
datasetIds | string[] | IDs of datasets to delete. Required. |
permanent | boolean | If false (default), sets deleted_at = NOW() (soft delete). If true, issues a DELETE FROM datasets WHERE id = $1 which cascades to all child rows. |
Permanent deletion is irreversible. A hard delete cascades to dataset_records, dataset_shares, and invitations. To clean up attachment files from storage after a permanent delete, follow up with a cleanup_dataset_attachments task.
Partial Success
Datasets that are not found or fail to delete are recorded in output.errors. The task completes if at least one deletion succeeded; fails if all deletions failed.
Output Schema
{
"deletedIds": ["ds_abc123", "ds_def456"],
"deleted": 2,
"errors": [
{ "id": "ds_ghi789", "error": "Dataset \"ds_ghi789\" not found" }
]
}
duplicate_dataset
Creates a copy of a dataset with a new name. The new dataset is owned by task.created_by. Metadata is copied from the source but public and locked are always forced to false on the copy. Optionally copies all non-deleted records in pages of 500, with cancellation checks between pages.
| Property | Value |
|---|---|
| Task Type | duplicate_dataset |
| Tables Used | tasks, task_events, datasets (insert), dataset_shares (insert), dataset_records (read + insert) |
Input Schema
{
"sourceDatasetId": "ds_abc123",
"name": "Copy of My Dataset",
"description": "Optional override description",
"include_records": true
}
| Field | Type | Description |
|---|---|---|
sourceDatasetId | string | ID of the dataset to copy. Required. Must not be soft-deleted. |
name | string | Name for the new dataset. Required. |
description | string? | Description for the new dataset. If omitted, the source description is copied. |
include_records | boolean | If true, all non-deleted records from the source are copied. Defaults to false. |
Attachment fields: When copying records, any field of type attachment has its value zeroed to [] in the copy. File references stored in the source dataset's records are not transferred, since the attachment files are tied to the original dataset's storage prefix.
Record Paging
Records are copied in pages of 500 rows ordered by created_at ASC. After each page, the task emits a records_copied event and updates progress (10–99%). Cancellation is checked at the start of each page. If cancelled mid-copy, the partially-copied records remain in the new dataset.
Output Schema
{
"datasetId": "ds_xyz999",
"name": "Copy of My Dataset",
"recordsCopied": 1247
}
cleanup_stale_attachments
Finds rows in pending_attachments older than a configurable TTL, deletes the associated file from object storage, then deletes the database row. This task is designed to run on a schedule (e.g., every hour) to prevent orphaned uploads from accumulating.
| Property | Value |
|---|---|
| Task Type | cleanup_stale_attachments |
| Tables Used | tasks, task_events, pending_attachments (select + delete) |
Input Schema
{
"ttlHours": 24
}
| Field | Type | Description |
|---|---|---|
ttlHours | number? | Age threshold in hours. Rows with created_at < NOW() - (ttlHours * INTERVAL '1 hour') are deleted. Defaults to 24. Must be a positive number. |
Storage vs DB errors: Storage delete failures are logged as warnings but do not stop processing — the DB row is still deleted. The output deleted count reflects successful DB row deletions only.
Cancellation
Cancellation is checked between each row. If cancelled mid-run, already-deleted rows remain deleted.
Output Schema
{
"deleted": 12
}
cleanup_dataset_attachments
Lists and deletes all object storage files under the prefix attachments/{datasetId}/. This task is typically queued after a dataset is permanently deleted, to clean up its attachment files from storage. It does not query the database — the dataset rows are already gone by the time it runs.
| Property | Value |
|---|---|
| Task Type | cleanup_dataset_attachments |
| Tables Used | tasks, task_events (no dataset tables accessed) |
Input Schema
{
"datasetId": "ds_abc123"
}
| Field | Type | Description |
|---|---|---|
datasetId | string | ID of the dataset whose attachment files should be deleted. Required. Used to construct the storage prefix attachments/{datasetId}/. |
No database queries: This handler only interacts with object storage — it lists all keys under the prefix and deletes them. Individual file deletion failures are logged as warnings but do not fail the task.
Output Schema
{
"deleted": 38
}