Skip to content

Commit 917ca08

Browse files
feat(restate): add storage cleanup service for orphaned audio files (#2189)
- Add StorageCleanup service with cleanupOldFiles handler - Lists all files in audio-files bucket and deletes those older than cutoff - Add listFiles and listAllFiles functions to supabase.ts - Add logging to existing deletion catch block in stt-file.ts - Register new service in index.ts This addresses the issue where audio files could become orphaned in Supabase storage when: - Upload succeeds but pipeline start fails - Deletion silently fails - Restate workflow crashes before finally block - Provider callback never arrives Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: yujonglee <[email protected]>
1 parent 8a01bd2 commit 917ca08

File tree

4 files changed

+147
-4
lines changed

4 files changed

+147
-4
lines changed

apps/restate/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";
22

33
import { type Env, envSchema } from "./env";
44
import { rateLimiter } from "./services/rate-limit";
5+
import { storageCleanup } from "./services/storage-cleanup";
56
import { sttFile } from "./services/stt-file";
67

78
export default {
89
fetch(request: Request, _env: Env, _ctx: ExecutionContext) {
910
const env = envSchema.parse(_env);
1011
return restate.createEndpointHandler({
11-
services: [rateLimiter, sttFile],
12+
services: [rateLimiter, sttFile, storageCleanup],
1213
...(env.RESTATE_IDENTITY_KEY
1314
? { identityKeys: [env.RESTATE_IDENTITY_KEY] }
1415
: {}),
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";
2+
import { serde } from "@restatedev/restate-sdk-zod";
3+
import { z } from "zod";
4+
5+
import { type Env } from "../env";
6+
import { deleteFile, listAllFiles } from "../supabase";
7+
8+
const CleanupInput = z.object({
9+
cutoffHours: z.number().min(1).default(24),
10+
});
11+
12+
export type CleanupInputType = z.infer<typeof CleanupInput>;
13+
14+
const CleanupResult = z.object({
15+
deletedCount: z.number(),
16+
failedCount: z.number(),
17+
totalScanned: z.number(),
18+
errors: z.array(z.string()),
19+
});
20+
21+
export type CleanupResultType = z.infer<typeof CleanupResult>;
22+
23+
export const storageCleanup = restate.service({
24+
name: "StorageCleanup",
25+
handlers: {
26+
cleanupOldFiles: restate.handlers.handler(
27+
{ input: serde.zod(CleanupInput) },
28+
async (
29+
ctx: restate.Context,
30+
input: CleanupInputType,
31+
): Promise<CleanupResultType> => {
32+
const env = ctx.request().extraArgs[0] as Env;
33+
const cutoffMs = input.cutoffHours * 60 * 60 * 1000;
34+
const cutoffDate = new Date(Date.now() - cutoffMs);
35+
36+
const files = await ctx.run("list-files", () => listAllFiles(env));
37+
38+
let deletedCount = 0;
39+
let failedCount = 0;
40+
const errors: string[] = [];
41+
42+
for (const file of files) {
43+
const fileDate = new Date(file.created_at);
44+
if (fileDate < cutoffDate) {
45+
const filePath = file.name;
46+
try {
47+
await ctx.run(`delete-${filePath}`, () =>
48+
deleteFile(env, filePath),
49+
);
50+
deletedCount++;
51+
} catch (err) {
52+
failedCount++;
53+
const errorMsg =
54+
err instanceof Error ? err.message : "Unknown error";
55+
errors.push(`Failed to delete ${filePath}: ${errorMsg}`);
56+
if (errors.length >= 10) {
57+
errors.push("... (truncated, too many errors)");
58+
break;
59+
}
60+
}
61+
}
62+
}
63+
64+
return {
65+
deletedCount,
66+
failedCount,
67+
totalScanned: files.length,
68+
errors,
69+
};
70+
},
71+
),
72+
},
73+
});
74+
75+
export type StorageCleanup = typeof storageCleanup;

apps/restate/src/services/stt-file.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,16 @@ export const sttFile = restate.workflow({
8787
ctx.set("error", error);
8888
throw err;
8989
} finally {
90-
await ctx.run("cleanup", () =>
91-
deleteFile(env, input.fileId).catch(() => {}),
92-
);
90+
await ctx.run("cleanup", async () => {
91+
try {
92+
await deleteFile(env, input.fileId);
93+
} catch (err) {
94+
console.error("Failed to delete audio file from storage", {
95+
fileId: input.fileId,
96+
error: err instanceof Error ? err.message : String(err),
97+
});
98+
}
99+
});
93100
}
94101
},
95102
),

apps/restate/src/supabase.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,63 @@ export async function deleteFile(
7979
throw new Error(`Failed to delete file: ${response.status} ${body}`);
8080
}
8181
}
82+
83+
export interface StorageFile {
84+
name: string;
85+
id: string;
86+
created_at: string;
87+
updated_at: string;
88+
metadata: Record<string, unknown>;
89+
}
90+
91+
export async function listFiles(
92+
env: SupabaseEnv,
93+
prefix: string = "",
94+
limit: number = 100,
95+
offset: number = 0,
96+
): Promise<StorageFile[]> {
97+
const { url, serviceRoleKey } = getSupabaseConfig(env);
98+
99+
const response = await fetch(`${url}/storage/v1/object/list/audio-files`, {
100+
method: "POST",
101+
headers: {
102+
Authorization: `Bearer ${serviceRoleKey}`,
103+
apikey: serviceRoleKey,
104+
"Content-Type": "application/json",
105+
},
106+
body: JSON.stringify({
107+
prefix,
108+
limit,
109+
offset,
110+
}),
111+
});
112+
113+
if (!response.ok) {
114+
const body = await response.text();
115+
throw new Error(`Failed to list files: ${response.status} ${body}`);
116+
}
117+
118+
return (await response.json()) as StorageFile[];
119+
}
120+
121+
export async function listAllFiles(env: SupabaseEnv): Promise<StorageFile[]> {
122+
const allFiles: StorageFile[] = [];
123+
let offset = 0;
124+
const limit = 100;
125+
126+
while (true) {
127+
const files = await listFiles(env, "", limit, offset);
128+
if (files.length === 0) break;
129+
130+
for (const file of files) {
131+
if (file.id) {
132+
allFiles.push(file);
133+
}
134+
}
135+
136+
if (files.length < limit) break;
137+
offset += limit;
138+
}
139+
140+
return allFiles;
141+
}

0 commit comments

Comments
 (0)