Datasets

Dataset-level task handlers — generating, importing, updating, deleting, duplicating, and cleaning up datasets and their attachments.

Overview

The worker supports seven dataset-level task types. Unlike ai_generate_records and ai_generate_fields, which operate on the contents and schema of an existing dataset, these tasks create, modify, or manage datasets themselves.

Task TypeDescription
ai_generate_datasetsGenerate new datasets or enrich existing ones using AI
import_datasetCreate a dataset from an uploaded CSV, JSON, or NDJSON file
batch_update_datasetsUpdate name, description, tags, or options on multiple datasets
batch_delete_datasetsSoft-delete or permanently delete multiple datasets
duplicate_datasetCopy a dataset's metadata and optionally all its records
cleanup_stale_attachmentsDelete pending attachment files and DB rows older than a TTL
cleanup_dataset_attachmentsDelete all attachment files for a deleted dataset from storage

Database Tables

datasets Table

The primary table for dataset metadata. Dataset-level handlers read from and write to this table. The fields column holds the complete field schema as a JSONB array.

ColumnTypeDescription
idvarchar (UUID)Primary key, auto-generated
namevarchar(255)Dataset display name
descriptiontext (nullable)Human-readable description
tagsjsonbArray of string tags
fieldsjsonbArray of FieldDefinition objects — see the Fields page
optionsjsonbFreeform metadata. Workers read options->>'locked' and options->>'public'. Handlers may write a generated sub-object to record what was AI-generated.
deleted_attimestamp (nullable)Soft-delete timestamp. Workers must check deleted_at IS NULL before operating on a dataset.
created_byvarcharFK to users.id — dataset owner
created_attimestampAuto-set on insert
updated_attimestampUpdated on every change
updated_byvarcharFK to users.id — worker sets this on every write

dataset_shares Table

Controls who has access to a dataset. When any handler creates a new dataset, it must immediately insert an owner share for task.created_by or the user will not be able to see it.

ColumnTypeDescription
dataset_idvarcharFK to datasets.id
user_idvarcharFK to users.id
rolevarcharAlways 'owner' when created by a worker

pending_attachments Table

Tracks file uploads that are waiting to be associated with a record. Rows are written by the API server when a user uploads a file but hasn't yet saved the record. The cleanup_stale_attachments handler removes rows (and their storage files) that have been waiting too long.

ColumnTypeDescription
idvarchar (UUID)Primary key
storage_keyvarcharObject storage key used to delete the file
created_attimestampWhen the upload was staged

ai_generate_datasets

Generates new dataset schemas via AI (generate mode) or updates the metadata of existing datasets (enrich mode). Both modes use a structured prompt and return AI-generated field definitions post-processed by restructureAIFields — assigning fld_ IDs and nesting options into a sub-object.

PropertyValue
Task Typeai_generate_datasets
Tables Usedtasks, task_events, datasets (insert or update), dataset_shares (insert for generate)

Input Schema

{
  "mode": "generate",
  "count": 3,
  "ai": {
    "prompt": "Generate SaaS product datasets with pricing and feature data",
    "model": "gpt-4o",
    "temperature": 0.7,
    "max_output_tokens": 16000
  }
}
{
  "mode": "enrich",
  "datasetIds": ["ds_abc123", "ds_def456"],
  "ai": {
    "prompt": "Add tags and improve descriptions based on the existing fields",
    "model": "gpt-4o",
    "temperature": 0.5
  }
}
FieldTypeDescription
modestring"generate" or "enrich". Defaults to "generate" if omitted.
countnumberNumber of datasets to generate (generate mode only). Defaults to 1.
datasetIdsstring[]IDs of existing datasets to enrich (enrich mode only). Required for enrich mode.
ai.promptstring?User instruction. In generate mode, describes what datasets to create. In enrich mode, describes what to update. Falls back to auto-detected missing fields if omitted.
ai.modelstringRequested model (may be "auto"). Use task.metadata.resolvedModel as the authoritative value.
ai.temperaturenumber?Sampling temperature (0–2). Defaults to 0.7.
ai.max_output_tokensnumber?Token limit for the AI call. Defaults to 16000.

Generate Mode

A single AI call produces all count datasets at once using structured output (GENERATE_BASED_SCHEMA). Each dataset is then inserted into datasets and an owner share is created in dataset_shares. If an individual insert fails, that dataset is recorded with an error but the rest continue (partial success within generate mode).

Structured output: OpenAI provider uses jsonSchema structured output for guaranteed-valid JSON. Anthropic and Gemini use jsonMode. A JSON repair function handles any malformed responses.

Enrich Mode

Each dataset in datasetIds is enriched with a separate AI call (ENRICH_DATASET_SCHEMA). The current dataset state (name, description, tags, field names, and up to 5 sample records) is passed to the model. Only non-null fields returned by the model are applied. Cancellation is checked between datasets.

Output Schema

{
  "datasetIds": ["ds_xyz001", "ds_xyz002"],
  "datasets": [
    {
      "id": "ds_xyz001",
      "name": "SaaS Pricing Data",
      "description": "Pricing plans for SaaS companies",
      "tags": ["saas", "pricing"],
      "fields": [...],
      "tokenUsage": { "input_tokens": 1200, "output_tokens": 900, "total_tokens": 2100 }
    }
  ],
  "tokenUsage": { "input_tokens": 1200, "output_tokens": 900, "total_tokens": 2100 }
}
FieldDescription
datasetIdsIDs of successfully created or updated datasets
datasetsPer-dataset result objects. Failed inserts include an error sub-object instead of an id.
tokenUsageAccumulated token usage across all AI calls

import_dataset

Downloads a file from object storage, parses it, creates a new dataset, and batch-inserts the rows as records. The file is always deleted from storage at the end, whether the import succeeds or fails.

PropertyValue
Task Typeimport_dataset
Tables Usedtasks, task_events, datasets (insert), dataset_shares (insert), dataset_records (insert)
Row Limit10,000 rows maximum

Input Schema

{
  "fileKey": "imports/user_abc/companies.csv",
  "name": "My Companies",
  "description": "Imported from CSV",
  "tags": ["import", "companies"],
  "options": {},
  "fields": [
    { "id": "fld_a1b2c3d4e5f6", "name": "Company", "type": "text", "options": { "required": true } },
    { "id": "fld_b2c3d4e5f6g7", "name": "Revenue", "type": "number" }
  ]
}
FieldTypeDescription
fileKeystringStorage key of the uploaded file. Required.
namestring?Dataset name. Defaults to "Untitled Dataset".
descriptionstring?Dataset description. Optional.
tagsstring[]?Tags for the dataset. Defaults to [].
optionsobject?Freeform options stored on the dataset. Defaults to {}.
fieldsFieldDefinition[]?Explicit field schema. If omitted, fields are inferred from the file contents using type detection.

Supported Formats

Format is detected from the file extension:

ExtensionFormatNotes
.csvCSVFirst row is treated as headers. All values are strings — type coercion is applied during validation.
.jsonJSONMust be a top-level array of objects.
.ndjson, .jsonlNewline-delimited JSONOne JSON object per line. Empty lines are skipped.

Type Inference

When fields is not provided, the worker samples all values in each column and infers the most specific matching type. The inference priority is: boolean → integer → number → email → url → date → array → object → text. Inferred attachment type results are filtered out defensively.

Type Coercion

During validation, values from CSV files (which are always strings) are coerced to their field type. Numbers strip currency symbols. Booleans accept "true"/"false"/"1"/"0"/"yes"/"no". Dates accept ISO 8601, common locale formats, and Unix timestamps. Values that cannot be coerced are set to null and recorded as row-level errors.

Attachment fields: If the provided field schema includes attachment-type fields, those fields are preserved in the dataset schema but no data is written for them on any row — attachment file references cannot be imported from flat files.

Partial Success

Row-level validation errors do not abort the import. All valid rows are inserted; invalid rows are collected in output.errors. The task completes successfully as long as at least one row was inserted. If zero rows are valid, the task fails.

Output Schema

{
  "dataset": {
    "id": "ds_abc123",
    "name": "My Companies",
    "description": "Imported from CSV",
    "tags": ["import", "companies"],
    "fields": [...]
  },
  "detectedFormat": "csv",
  "fieldsSource": "inferred",
  "fields": [...],
  "summary": {
    "totalRows": 850,
    "imported": 847,
    "failed": 3
  },
  "errors": [
    { "index": 5, "error": "row 5: Revenue — expected number, got 'N/A'" },
    { "index": 23, "error": "row 23: Email — expected valid email address, got 'not-an-email'" }
  ]
}
FieldDescription
datasetMetadata for the newly created dataset: id, name, description, tags, and fields
detectedFormatFile format detected from the extension: "csv", "json", "ndjson", or "jsonl"
fieldsSource"provided" if fields were passed in the input, "inferred" if they were auto-detected from the file
fieldsThe full field definition array used for the import
summary.totalRowsTotal rows found in the file
summary.importedRows successfully inserted
summary.failedRows that failed validation or insertion
errorsArray of { index, error } objects for each failed row. Omitted when there are no errors.

batch_update_datasets

Updates name, description, tags, and/or options on multiple datasets sequentially. The options field is merged with the existing value (shallow merge) rather than replaced wholesale. Cancellation is checked between items.

PropertyValue
Task Typebatch_update_datasets
Tables Usedtasks, task_events, datasets (update)

Input Schema

{
  "datasetUpdates": [
    {
      "id": "ds_abc123",
      "name": "Renamed Dataset",
      "description": "Updated description",
      "tags": ["updated", "q1"],
      "options": { "locked": true }
    },
    {
      "id": "ds_def456",
      "tags": ["archived"]
    }
  ]
}
FieldTypeDescription
datasetUpdatesobject[]Array of update objects. Each must have an id. All other fields are optional — only provided fields are updated.
idstringID of the dataset to update. Required per item.
namestring?New dataset name.
descriptionstring?New dataset description.
tagsstring[]?Replacement tag array.
optionsobject?Partial options update. Merged with existing options using { ...existing, ...update.options }.

Partial Success

If individual updates fail (e.g., dataset not found), they are recorded in output.errors and the rest continue. The task completes successfully if at least one update succeeded. If all updates fail, the task is marked as failed.

Output Schema

{
  "updatedIds": ["ds_abc123"],
  "updated": 1,
  "errors": [
    { "id": "ds_def456", "error": "Dataset \"ds_def456\" not found" }
  ]
}

batch_delete_datasets

Deletes multiple datasets either as a soft-delete (sets deleted_at) or a permanent delete (hard DELETE that cascades to records, shares, and invitations). Cancellation is checked between items.

PropertyValue
Task Typebatch_delete_datasets
Tables Usedtasks, task_events, datasets (update or delete)

Input Schema

{
  "datasetIds": ["ds_abc123", "ds_def456", "ds_ghi789"],
  "permanent": false
}
FieldTypeDescription
datasetIdsstring[]IDs of datasets to delete. Required.
permanentbooleanIf false (default), sets deleted_at = NOW() (soft delete). If true, issues a DELETE FROM datasets WHERE id = $1 which cascades to all child rows.

Permanent deletion is irreversible. A hard delete cascades to dataset_records, dataset_shares, and invitations. To clean up attachment files from storage after a permanent delete, follow up with a cleanup_dataset_attachments task.

Partial Success

Datasets that are not found or fail to delete are recorded in output.errors. The task completes if at least one deletion succeeded; fails if all deletions failed.

Output Schema

{
  "deletedIds": ["ds_abc123", "ds_def456"],
  "deleted": 2,
  "errors": [
    { "id": "ds_ghi789", "error": "Dataset \"ds_ghi789\" not found" }
  ]
}

duplicate_dataset

Creates a copy of a dataset with a new name. The new dataset is owned by task.created_by. Metadata is copied from the source but public and locked are always forced to false on the copy. Optionally copies all non-deleted records in pages of 500, with cancellation checks between pages.

PropertyValue
Task Typeduplicate_dataset
Tables Usedtasks, task_events, datasets (insert), dataset_shares (insert), dataset_records (read + insert)

Input Schema

{
  "sourceDatasetId": "ds_abc123",
  "name": "Copy of My Dataset",
  "description": "Optional override description",
  "include_records": true
}
FieldTypeDescription
sourceDatasetIdstringID of the dataset to copy. Required. Must not be soft-deleted.
namestringName for the new dataset. Required.
descriptionstring?Description for the new dataset. If omitted, the source description is copied.
include_recordsbooleanIf true, all non-deleted records from the source are copied. Defaults to false.

Attachment fields: When copying records, any field of type attachment has its value zeroed to [] in the copy. File references stored in the source dataset's records are not transferred, since the attachment files are tied to the original dataset's storage prefix.

Record Paging

Records are copied in pages of 500 rows ordered by created_at ASC. After each page, the task emits a records_copied event and updates progress (10–99%). Cancellation is checked at the start of each page. If cancelled mid-copy, the partially-copied records remain in the new dataset.

Output Schema

{
  "datasetId": "ds_xyz999",
  "name": "Copy of My Dataset",
  "recordsCopied": 1247
}

cleanup_stale_attachments

Finds rows in pending_attachments older than a configurable TTL, deletes the associated file from object storage, then deletes the database row. This task is designed to run on a schedule (e.g., every hour) to prevent orphaned uploads from accumulating.

PropertyValue
Task Typecleanup_stale_attachments
Tables Usedtasks, task_events, pending_attachments (select + delete)

Input Schema

{
  "ttlHours": 24
}
FieldTypeDescription
ttlHoursnumber?Age threshold in hours. Rows with created_at < NOW() - (ttlHours * INTERVAL '1 hour') are deleted. Defaults to 24. Must be a positive number.

Storage vs DB errors: Storage delete failures are logged as warnings but do not stop processing — the DB row is still deleted. The output deleted count reflects successful DB row deletions only.

Cancellation

Cancellation is checked between each row. If cancelled mid-run, already-deleted rows remain deleted.

Output Schema

{
  "deleted": 12
}

cleanup_dataset_attachments

Lists and deletes all object storage files under the prefix attachments/{datasetId}/. This task is typically queued after a dataset is permanently deleted, to clean up its attachment files from storage. It does not query the database — the dataset rows are already gone by the time it runs.

PropertyValue
Task Typecleanup_dataset_attachments
Tables Usedtasks, task_events (no dataset tables accessed)

Input Schema

{
  "datasetId": "ds_abc123"
}
FieldTypeDescription
datasetIdstringID of the dataset whose attachment files should be deleted. Required. Used to construct the storage prefix attachments/{datasetId}/.

No database queries: This handler only interacts with object storage — it lists all keys under the prefix and deletes them. Individual file deletion failures are logged as warnings but do not fail the task.

Output Schema

{
  "deleted": 38
}