Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions extension/src/kernel/KernelManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ function processOperation(
yield* datasources.updateTablePreview(notebookUri, operation);
break;
}
case "sql-table-list-preview": {
yield* datasources.updateTableListPreview(notebookUri, operation);
case "catalog-children-preview": {
yield* datasources.updateCatalogChildrenPreview(notebookUri, operation);
break;
}
case "data-column-preview": {
Expand All @@ -327,7 +327,6 @@ function processOperation(
case "reconnected":
case "reload":
case "secret-keys-result":
case "sql-schema-list-preview":
case "startup-logs":
case "storage-download-ready":
case "storage-entries":
Expand Down
132 changes: 90 additions & 42 deletions extension/src/panel/datasources/DatasourcesService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,35 @@ import { Effect, HashMap, SubscriptionRef } from "effect";

import type { NotebookId } from "../../schemas/MarimoNotebookDocument.ts";
import type {
CatalogChildrenPreviewNotification,
CatalogNode,
DataColumnPreviewNotification,
DataSourceConnectionsNotification,
DatasetsNotification,
SqlTableListPreviewNotification,
DataTableNode,
SqlTablePreviewNotification,
} from "../../types.ts";

/**
* Maps for efficient lookups in the datasource hierarchy:
* Connection -> Database -> Schema -> Table
* Connection -> Database -> CatalogTreeNode tree.
*/

/**
* Normalized catalog tree node. Container nodes (`schema`, `namespace`) carry
* their resolved `children`; `data_table` leaves carry their `table` payload.
*
* Deferred buckets on the wire (`children`/`tables` === null) are normalized to
* an empty `children` array — the panel renders what the kernel has discovered
* and does not itself drive lazy catalog fetches.
*/
export interface CatalogTreeNode {
kind: "schema" | "namespace" | "data_table";
name: string;
children: CatalogTreeNode[];
table: DataTableNode | null;
}

interface DataSourceConnectionMap {
// connection name -> connection data
connections: Map<
Expand All @@ -31,13 +48,7 @@ interface DataSourceConnectionMap {
name: string;
dialect: string;
engine: string | null;
schemas: Map<
string,
{
name: string;
tables: Map<string, DataTable>;
}
>;
children: CatalogTreeNode[];
}
>;
}
Expand Down Expand Up @@ -76,7 +87,7 @@ interface DatasetsMap {
* 1. Data source connections (data-source-connections operation)
* 2. Datasets (datasets operation)
* 3. SQL table previews (sql-table-preview operation)
* 4. SQL table list previews (sql-table-list-preview operation)
* 4. Catalog children previews (catalog-children-preview operation)
* 5. Data column previews (data-column-preview operation)
*
* Uses SubscriptionRef for reactive state management.
Expand All @@ -101,16 +112,49 @@ export class DatasourcesService extends Effect.Service<DatasourcesService>()(
HashMap.empty<NotebookId, Map<string, DataTable | null>>(),
);

// Track SQL table list previews: NotebookUri -> Map<request_id, tables[]>
const tableListPreviewsRef = yield* SubscriptionRef.make(
HashMap.empty<NotebookId, Map<string, DataTable[]>>(),
// Track catalog children previews: NotebookUri -> Map<request_id, nodes[]>
const catalogChildrenPreviewsRef = yield* SubscriptionRef.make(
HashMap.empty<NotebookId, Map<string, CatalogTreeNode[]>>(),
);

// Track column previews: NotebookUri -> Map<table_name, ColumnStats>
const columnPreviewsRef = yield* SubscriptionRef.make(
HashMap.empty<NotebookId, Map<string, unknown>>(),
);

/**
* Normalize a wire catalog node into the panel's `CatalogTreeNode`.
*
* Recurses through `schema.tables` and `namespace.children`, flattening a
* deferred (`null`) bucket to an empty child list.
*/
const normalizeCatalogNode = (node: CatalogNode): CatalogTreeNode => {
switch (node.kind) {
case "schema":
return {
kind: "schema",
name: node.name,
children: (node.tables ?? []).map(normalizeCatalogNode),
table: null,
};
case "namespace":
return {
kind: "namespace",
name: node.name,
children: (node.children ?? []).map(normalizeCatalogNode),
table: null,
};
default:
// data_table leaf
return {
kind: "data_table",
name: node.name,
children: [],
table: node,
};
}
};

/**
* Convert DataSourceConnection list to efficient map structure
*/
Expand All @@ -123,25 +167,11 @@ export class DatasourcesService extends Effect.Service<DatasourcesService>()(
const databasesMap = new Map();

for (const db of conn.databases) {
const schemasMap = new Map();

for (const schema of db.schemas) {
const tablesMap = new Map();
for (const table of schema.tables) {
tablesMap.set(table.name, table);
}

schemasMap.set(schema.name, {
name: schema.name,
tables: tablesMap,
});
}

databasesMap.set(db.name, {
name: db.name,
dialect: db.dialect,
engine: db.engine ?? null,
schemas: schemasMap,
children: (db.children ?? []).map(normalizeCatalogNode),
});
}

Expand Down Expand Up @@ -254,28 +284,46 @@ export class DatasourcesService extends Effect.Service<DatasourcesService>()(
},

/**
* Update SQL table list preview for a notebook
* Update catalog children preview for a notebook.
*
* Skips updates that carry an error so a transient failure doesn't
* erase previously cached children.
*/
updateTableListPreview(
updateCatalogChildrenPreview(
notebookUri: NotebookId,
operation: SqlTableListPreviewNotification,
operation: CatalogChildrenPreviewNotification,
) {
return Effect.gen(function* () {
yield* SubscriptionRef.update(tableListPreviewsRef, (map) => {
if (operation.error != null) {
yield* Effect.logWarning("Catalog children preview failed").pipe(
Effect.annotateLogs({
notebookUri,
request_id: operation.request_id,
error: operation.error,
}),
);
return;
}

const children = (operation.children ?? []).map(
normalizeCatalogNode,
);

yield* SubscriptionRef.update(catalogChildrenPreviewsRef, (map) => {
const existing = HashMap.get(map, notebookUri);
const previewMap =
existing._tag === "Some" ? existing.value : new Map();

previewMap.set(operation.request_id, operation.tables ?? []);
previewMap.set(operation.request_id, children);

return HashMap.set(map, notebookUri, previewMap);
});

yield* Effect.logTrace("Updated table list preview").pipe(
yield* Effect.logTrace("Updated catalog children preview").pipe(
Effect.annotateLogs({
notebookUri,
request_id: operation.request_id,
count: operation.tables?.length ?? 0,
count: children.length,
}),
);
});
Expand Down Expand Up @@ -345,11 +393,11 @@ export class DatasourcesService extends Effect.Service<DatasourcesService>()(
},

/**
* Get table list preview for a notebook and request ID
* Get catalog children preview for a notebook and request ID
*/
getTableListPreview(notebookUri: NotebookId, requestId: string) {
getCatalogChildrenPreview(notebookUri: NotebookId, requestId: string) {
return Effect.gen(function* () {
const map = yield* SubscriptionRef.get(tableListPreviewsRef);
const map = yield* SubscriptionRef.get(catalogChildrenPreviewsRef);
const previewMap = HashMap.get(map, notebookUri);
if (previewMap._tag === "Some") {
return previewMap.value.get(requestId) ?? [];
Expand Down Expand Up @@ -386,7 +434,7 @@ export class DatasourcesService extends Effect.Service<DatasourcesService>()(
yield* SubscriptionRef.update(tablePreviewsRef, (map) =>
HashMap.remove(map, notebookUri),
);
yield* SubscriptionRef.update(tableListPreviewsRef, (map) =>
yield* SubscriptionRef.update(catalogChildrenPreviewsRef, (map) =>
HashMap.remove(map, notebookUri),
);
yield* SubscriptionRef.update(columnPreviewsRef, (map) =>
Expand Down Expand Up @@ -427,12 +475,12 @@ export class DatasourcesService extends Effect.Service<DatasourcesService>()(
},

/**
* Stream of table list preview changes.
* Stream of catalog children preview changes.
*
* Emits the current value on subscription, then all subsequent changes.
*/
streamTableListPreviewsChanges() {
return tableListPreviewsRef.changes;
streamCatalogChildrenPreviewsChanges() {
return catalogChildrenPreviewsRef.changes;
},

/**
Expand Down
Loading
Loading