Skip to content
8 changes: 1 addition & 7 deletions src/@types/PersistentStorage.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { AccessList } from './AccessList'
import type { BaseFileObject } from './fileObject.js'
export type { PersistentStorageObject } from './fileObject.js'
export type PersistentStorageType = 'localfs' | 's3'

export interface PersistentStorageLocalFSOptions {
Expand Down Expand Up @@ -33,9 +33,3 @@ export interface DockerMountObject {
Target: string
ReadOnly: boolean
}

export interface PersistentStorageObject extends BaseFileObject {
type: 'nodePersistentStorage'
bucketId: string
fileName: string
}
2 changes: 2 additions & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ export interface FileInfoCommand extends Command {
fileIndex?: number
file?: StorageObject
checksum?: boolean
// required only for nodePersistentStorage files, to gate on the bucket ACL
consumerAddress?: string
}
// group these 2
export interface DDOCommand extends Command {
Expand Down
20 changes: 19 additions & 1 deletion src/@types/fileObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ export interface FtpFileObject extends BaseFileObject {
url: string
}

export interface PersistentStorageObject extends BaseFileObject {
type: 'nodePersistentStorage'
bucketId: string
fileName: string
}

export type StorageObject =
| UrlFileObject
| IpfsFileObject
| ArweaveFileObject
| S3FileObject
| FtpFileObject
| PersistentStorageObject

export interface StorageReadable {
stream: Readable
Expand All @@ -68,7 +75,18 @@ export enum FileObjectType {
IPFS = 'ipfs',
ARWEAVE = 'arweave',
S3 = 's3',
FTP = 'ftp'
FTP = 'ftp',
NODE_PERSISTENT_STORAGE = 'nodePersistentStorage'
}

// Case-insensitive match for the persistent-storage type. getStorageClass routes on
// `type?.toLowerCase()`, so all guard checks must normalize too or a casing variant
// (e.g. "NodePersistentStorage") slips past the guard and is still routed as PS.
export function isPersistentStorageType(type: unknown): boolean {
return (
typeof type === 'string' &&
type.toLowerCase() === FileObjectType.NODE_PERSISTENT_STORAGE.toLowerCase()
)
}

export interface FileInfoRequest {
Expand Down
186 changes: 90 additions & 96 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ import { ValidateParams } from '../httpRoutes/validateCommands.js'
import { Service } from '@oceanprotocol/ddo-js'
import { getOceanTokenAddressForChain } from '../../utils/address.js'
import { dockerRegistryAuth, OceanNodeConfig } from '../../@types/OceanNode.js'
import { EncryptMethod } from '../../@types/fileObject.js'
import {
BaseFileObject,
EncryptMethod,
isPersistentStorageType
} from '../../@types/fileObject.js'
import { getAddress, ZeroAddress } from 'ethers'
import { AccessList } from '../../@types/AccessList.js'

Expand Down Expand Up @@ -1962,10 +1966,16 @@ export class C2DEngineDocker extends C2DEngine {
// persistent Storage: bind-mount bucket files into the job container (localfs backend)
for (const i in job.assets) {
const asset = job.assets[i]
if (!asset.fileObject || asset.fileObject.type !== 'nodePersistentStorage') {
// resolve the effective file object (plaintext, encrypted, or via documentId/serviceId)
const resolved = await resolveComputeFileObject(asset)
if (!resolved || !isPersistentStorageType(resolved.type)) {
// non persistent-storage assets are downloaded later, during uploadData
continue
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
const fo = asset.fileObject as { bucketId?: string; fileName?: string }
// keep `resolved` local — do NOT persist the decrypted bucketId/fileName back
// onto the job record. uploadData independently re-detects encrypted and
// DDO-derived persistent-storage assets and skips them.
const fo = resolved as unknown as { bucketId?: string; fileName?: string }
if (!fo.bucketId || !fo.fileName) {
CORE_LOGGER.error(
`Job ${job.jobId} asset ${i}: nodePersistentStorage requires bucketId and fileName`
Expand Down Expand Up @@ -2872,108 +2882,37 @@ export class C2DEngineDocker extends C2DEngine {
appendFileSync(configLogPath, `Writing raw algo code to ${fullAlgoPath}\n`)
writeFileSync(fullAlgoPath, job.algorithm.meta.rawcode)
} else {
// do we have a files object?
if (job.algorithm.fileObject) {
// is it unencrypted?
if (job.algorithm.fileObject.type) {
// we can get the storage directly
try {
storage = Storage.getStorageClass(job.algorithm.fileObject, config)
} catch (e) {
CORE_LOGGER.error(`Unable to get storage class for algorithm: ${e.message}`)
appendFileSync(
configLogPath,
`Unable to get storage class for algorithm: ${e.message}\n`
)
return {
status: C2DStatusNumber.AlgorithmProvisioningFailed,
statusText: C2DStatusText.AlgorithmProvisioningFailed
}
}
} else {
// ok, maybe we have this encrypted instead
CORE_LOGGER.info(
'algorithm file object seems to be encrypted, checking it...'
)
// 1. Decrypt the files object
try {
const decryptedFileObject = await decryptFilesObject(
job.algorithm.fileObject
)
storage = Storage.getStorageClass(decryptedFileObject, config)
} catch (e) {
CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`)
appendFileSync(
configLogPath,
`Unable to decrypt algorithm files object: ${e.message}\n`
)
return {
status: C2DStatusNumber.AlgorithmProvisioningFailed,
statusText: C2DStatusText.AlgorithmProvisioningFailed
}
}
}
} else {
// no files object, try to get information from documentId and serviceId
// resolve the effective algorithm file object (plaintext, encrypted, or via
// documentId/serviceId). All types — including nodePersistentStorage — are
// streamed uniformly through the Storage class (job.owner enforces the bucket
// ACL for persistent storage).
const resolved = await resolveComputeFileObject(job.algorithm)
if (!resolved) {
CORE_LOGGER.info(
'algorithm file object seems to be missing, checking "serviceId" and "documentId"...'
'Could not extract any files object from the compute algorithm, skipping...'
)
const { serviceId, documentId } = job.algorithm
appendFileSync(
configLogPath,
`Using ${documentId} and serviceId ${serviceId} to get algorithm files.\n`
'Could not extract any files object from the compute algorithm, skipping...\n'
)
// we can get it from this info
if (serviceId && documentId) {
const algoDdo = await new FindDdoHandler(
OceanNode.getInstance()
).findAndFormatDdo(documentId)
// 1. Get the service
const service: Service = AssetUtils.getServiceById(algoDdo, serviceId)
if (!service) {
CORE_LOGGER.error(
`Could not find service with ID ${serviceId} in DDO ${documentId}`
)
appendFileSync(
configLogPath,
`Could not find service with ID ${serviceId} in DDO ${documentId}\n`
)
return {
status: C2DStatusNumber.AlgorithmProvisioningFailed,
statusText: C2DStatusText.AlgorithmProvisioningFailed
}
}
try {
// 2. Decrypt the files object
const decryptedFileObject = await decryptFilesObject(service.files)
storage = Storage.getStorageClass(decryptedFileObject, config)
} catch (e) {
CORE_LOGGER.error(`Unable to decrypt algorithm files object: ${e.message}`)
appendFileSync(
configLogPath,
`Unable to decrypt algorithm files object: ${e.message}\n`
)
return {
status: C2DStatusNumber.AlgorithmProvisioningFailed,
statusText: C2DStatusText.AlgorithmProvisioningFailed
}
} else {
try {
storage = Storage.getStorageClass(resolved, config, job.owner)
} catch (e) {
CORE_LOGGER.error(`Unable to get storage class for algorithm: ${e.message}`)
appendFileSync(
configLogPath,
`Unable to get storage class for algorithm: ${e.message}\n`
)
return {
status: C2DStatusNumber.AlgorithmProvisioningFailed,
statusText: C2DStatusText.AlgorithmProvisioningFailed
}
}
}

if (storage) {
await pipeline(
(await storage.getReadableStream()).stream,
createWriteStream(fullAlgoPath)
)
} else {
CORE_LOGGER.info(
'Could not extract any files object from the compute algorithm, skipping...'
)
appendFileSync(
configLogPath,
'Could not extract any files object from the compute algorithm, skipping...\n'
)
}
}
} catch (e) {
Expand All @@ -3000,15 +2939,20 @@ export class C2DEngineDocker extends C2DEngine {
if (asset.fileObject) {
try {
if (asset.fileObject.type) {
if (asset.fileObject.type === 'nodePersistentStorage') {
// local storage is handled later, when we start the container and create the binds
if (isPersistentStorageType(asset.fileObject.type)) {
// persistent storage is handled during ConfiguringVolumes via bind mounts
continue
}
storage = Storage.getStorageClass(asset.fileObject, config)
} else {
CORE_LOGGER.info('asset file object seems to be encrypted, checking it...')
// get the encrypted bytes
let filesObject: any = await decryptFilesObject(asset.fileObject)
// persistent storage assets are bind-mounted during ConfiguringVolumes;
// skip download here even if the resolved object wasn't written back
if (isPersistentStorageType(filesObject?.type)) {
continue
}
filesObject = await this.addUserDataToFilesObject(filesObject, asset.userdata)
storage = Storage.getStorageClass(filesObject, config)
}
Expand Down Expand Up @@ -3045,6 +2989,10 @@ export class C2DEngineDocker extends C2DEngine {
const service: Service = AssetUtils.getServiceById(ddo, serviceId)
// 3. Decrypt the url
let decryptedFileObject = await decryptFilesObject(service.files)
// persistent storage assets are bind-mounted during ConfiguringVolumes
if (isPersistentStorageType(decryptedFileObject?.type)) {
continue
}
decryptedFileObject = await this.addUserDataToFilesObject(
decryptedFileObject,
asset.userdata
Expand Down Expand Up @@ -3442,6 +3390,52 @@ export class C2DEngineDocker extends C2DEngine {

// this uses the docker engine, but exposes only one env, the free one

/**
* Resolves the effective (decrypted) file object for a compute asset or algorithm.
* Handles the three ways a file object can arrive:
* 1. plaintext file object (already has a `type`)
* 2. encrypted file object (no `type`) -> decrypt it
* 3. no file object, only `documentId` + `serviceId` -> resolve the DDO and decrypt `service.files`
* Returns null if nothing could be resolved. Shared by the Docker provisioning path
* and the compute pre-checks (initialize/start) so persistent-storage ACL validation
* sees the same resolved object.
*/
export async function resolveComputeFileObject(
item: ComputeAsset | ComputeAlgorithm
): Promise<BaseFileObject | null> {
try {
if (item.fileObject) {
// plaintext: type is directly available
if ((item.fileObject as BaseFileObject).type) {
return item.fileObject as BaseFileObject
}
// encrypted: decrypt to reveal the type
return await decryptFilesObject(item.fileObject)
}
// no file object: try documentId + serviceId
const { documentId, serviceId } = item
if (documentId && serviceId) {
const ddo = await new FindDdoHandler(OceanNode.getInstance()).findAndFormatDdo(
documentId
)
if (!ddo) {
return null
}
const service: Service = AssetUtils.getServiceById(ddo, serviceId)
if (!service) {
return null
}
return await decryptFilesObject(service.files)
}
return null
} catch (e) {
CORE_LOGGER.error(
`Unable to resolve compute file object: ${e instanceof Error ? e.message : String(e)}`
)
return null
}
}

export function getAlgorithmImage(algorithm: ComputeAlgorithm, jobId: string): string {
if (!algorithm.meta || !algorithm.meta.container) {
return null
Expand Down
27 changes: 13 additions & 14 deletions src/components/core/compute/initialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import { sanitizeServiceFiles } from '../../../utils/util.js'
import { FindDdoHandler } from '../handler/ddoHandler.js'
import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js'
import { getNonceAsNumber } from '../utils/nonceHandler.js'
import { getAlgorithmImage } from '../../c2d/compute_engine_docker.js'
import {
getAlgorithmImage,
resolveComputeFileObject
} from '../../c2d/compute_engine_docker.js'

import { Credentials, DDOManager } from '@oceanprotocol/ddo-js'
import { checkCredentials } from '../../../utils/credentials.js'
Expand All @@ -39,10 +42,7 @@ import {
validateAlgoForDataset,
validateOutput
} from './utils.js'
import {
ensureConsumerAllowedForPersistentStorageLocalfsFileObject,
rejectPersistentStorageFileObjectOnAlgorithm
} from '../../persistentStorage/PersistentStorageFactory.js'
import { ensureConsumerAllowedForPersistentStorageLocalfsFileObject } from '../../persistentStorage/PersistentStorageFactory.js'

export class ComputeInitializeHandler extends CommandHandler {
validate(command: ComputeInitializeCommand): ValidateParams {
Expand Down Expand Up @@ -107,7 +107,8 @@ export class ComputeInitializeHandler extends CommandHandler {
task.algorithm.documentId,
task.algorithm.serviceId,
node,
config
config,
task.consumerAddress
)

const isRawCodeAlgorithm = task.algorithm.meta?.rawcode
Expand Down Expand Up @@ -224,17 +225,15 @@ export class ComputeInitializeHandler extends CommandHandler {
if (isValidOutput.status.httpStatus !== 200) {
return isValidOutput
}
const algoPersistentStorageBan = rejectPersistentStorageFileObjectOnAlgorithm(
task.algorithm.fileObject
)
if (algoPersistentStorageBan) {
return algoPersistentStorageBan
}
for (const dataset of task.datasets) {
for (const elem of [task.algorithm, ...task.datasets]) {
// resolve encrypted / documentId+serviceId references so persistent-storage ACL
// is validated here too (not only plaintext file objects)
const resolvedFileObject =
(await resolveComputeFileObject(elem)) ?? elem.fileObject
const psAccess = await ensureConsumerAllowedForPersistentStorageLocalfsFileObject(
node,
task.consumerAddress,
dataset.fileObject
resolvedFileObject
)
if (psAccess) {
return psAccess
Expand Down
Loading
Loading