Skip to content
1 change: 1 addition & 0 deletions docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/
- `AUTHORIZED_PUBLISHERS_LIST`: AccessList contract addresses (per chain). If present, Node will only index assets published by the accounts present on the given access lists. Example: `"{ \"8996\": [\"0x967da4048cD07aB37855c090aAF366e4ce1b9F48\",\"0x388C818CA8B9251b393131C08a736A67ccB19297\"] }"`
- `VALIDATE_UNSIGNED_DDO`: If set to `false`, the node will not validate unsigned DDOs and will request a signed message with the publisher address, nonce and signature. Default is `true`. Example: `false`
- `JWT_SECRET`: Secret used to sign JWT tokens. Default is `ocean-node-secret`. Example: `"my-secret-jwt-token"`
- `PERSISTENT_STORAGE`: Persistent storage config. See [persistent storage](persistentStorage.md).

## Database

Expand Down
4 changes: 4 additions & 0 deletions src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { FeeStrategy } from './Fees'
import { Schema } from '../components/database'
import { KeyProviderType } from './KeyManager'
import type { PersistentStorageConfig } from './PersistentStorage.js'
import type { AccessList } from './AccessList'

export interface OceanNodeDBConfig {
url: string | null
Expand Down Expand Up @@ -194,6 +195,9 @@ export interface OceanNodeStatus {
// detailed information
c2dClusters?: any[]
supportedSchemas?: Schema[]
persistentStorage?: {
accessLists?: AccessList[]
}
}

export interface FindDDOResponse {
Expand Down
27 changes: 18 additions & 9 deletions src/components/core/handler/persistentStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,24 @@ export class PersistentStorageCreateBucketHandler extends CommandHandler {
const storage = requirePersistentStorage(this)
const node = this.getOceanNode()
const config = node.getConfig()
const isAllowedCreate = await checkAddressOnAccessList(
task.consumerAddress,
config.persistentStorage?.accessLists,
node
)
if (!isAllowedCreate) {
return {
stream: null,
status: { httpStatus: 403, error: 'You are not allowed to create new buckets' }
// if we have access lists,check them.
if (
config.persistentStorage?.accessLists &&
config.persistentStorage?.accessLists.length > 0
) {
const isAllowedCreate = await checkAddressOnAccessList(
task.consumerAddress,
config.persistentStorage?.accessLists,
node
)
if (!isAllowedCreate) {
return {
stream: null,
status: {
httpStatus: 403,
error: 'You are not allowed to create new buckets'
}
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/components/core/utils/statusHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
StorageTypes,
OceanNodeConfig
} from '../../../@types/OceanNode.js'
import { getConfiguration } from '../../../utils/index.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { OceanNode } from '../../../OceanNode.js'
import { typesenseSchemas } from '../../database/TypesenseSchemas.js'
Expand Down Expand Up @@ -112,7 +111,7 @@ export async function status(
)
return
}
const config = await getConfiguration()
const config = oceanNode.getConfig()

// no previous status?
if (!nodeStatus) {
Expand Down Expand Up @@ -173,5 +172,11 @@ export async function status(
}
nodeStatus.supportedSchemas = typesenseSchemas.ddoSchemas
}

if (config.persistentStorage) {
nodeStatus.persistentStorage = {}
if (config.persistentStorage.accessLists)
nodeStatus.persistentStorage.accessLists = config.persistentStorage.accessLists
}
return nodeStatus
}
13 changes: 2 additions & 11 deletions src/components/httpRoutes/persistentStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ import {

export const persistentStorageRoutes = express.Router()

function readRawBody(req: any): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = []
req.on('data', (chunk: any) => chunks.push(Buffer.from(chunk)))
req.on('end', () => resolve(Buffer.concat(chunks)))
req.on('error', reject)
})
}

// Create bucket
persistentStorageRoutes.post(
`${SERVICES_API_BASE_PATH}/persistentStorage/buckets`,
Expand Down Expand Up @@ -144,7 +135,6 @@ persistentStorageRoutes.post(
`${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName`,
async (req, res) => {
try {
const raw = await readRawBody(req)
const response = await new PersistentStorageUploadFileHandler(req.oceanNode).handle(
{
command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE,
Expand All @@ -153,7 +143,8 @@ persistentStorageRoutes.post(
nonce: req.query.nonce as string,
bucketId: req.params.bucketId,
fileName: req.params.fileName,
stream: Readable.from(raw),
// Stream request body directly (supports chunked uploads, avoids buffering).
stream: req,
authorization: req.headers?.authorization,
caller: req.caller
} as any
Expand Down
26 changes: 24 additions & 2 deletions src/components/persistentStorage/PersistentStorageLocalFS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
PersistentStorageFileInfo
} from './PersistentStorageFactory.js'
import { OceanNode } from '../../OceanNode.js'
import { CORE_LOGGER } from '../../utils/logging/common.js'

export class PersistentStorageLocalFS extends PersistentStorageFactory {
/* eslint-disable security/detect-non-literal-fs-filename -- localfs backend operates on filesystem paths */
Expand All @@ -29,7 +30,26 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory {
.options as PersistentStorageLocalFSOptions

this.baseFolder = options.folder
fsp.mkdir(this.baseFolder, { recursive: true })

// Ensure base folder exists and is a directory (sync to avoid startup races).
try {
fs.mkdirSync(this.baseFolder, { recursive: true })
const st = fs.statSync(this.baseFolder)
if (!st.isDirectory()) {
throw new Error(
`Persistent storage folder is not a directory: ${this.baseFolder}`
)
}
fs.mkdirSync(path.join(this.baseFolder, 'buckets'), { recursive: true })
} catch (e: any) {
if (e?.code === 'EACCES') {
throw new Error(
`Persistent storage folder is not accessible (EACCES): ${this.baseFolder}. ` +
`Configure 'persistentStorage.options.folder' to a writable path inside the container and mount it as a volume.`
)
}
throw e
}
}

private bucketPath(bucketId: string): string {
Expand Down Expand Up @@ -78,7 +98,9 @@ export class PersistentStorageLocalFS extends PersistentStorageFactory {
): Promise<CreateBucketResult> {
const bucketId = randomUUID()
const createdAt = Math.floor(Date.now() / 1000)
await fsp.mkdir(this.bucketPath(bucketId), { recursive: true })
const path = this.bucketPath(bucketId)
CORE_LOGGER.debug(`Creating ${path} folder for new bucket`)
await fsp.mkdir(path)
await super.dbUpsertBucket(
bucketId,
owner,
Expand Down
18 changes: 16 additions & 2 deletions src/test/integration/persistentStorage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import {
PersistentStorageListFilesHandler,
PersistentStorageUploadFileHandler
} from '../../components/core/handler/persistentStorage.js'
import { StatusHandler } from '../../components/core/handler/statusHandler.js'
import { OceanNode } from '../../OceanNode.js'
import type { AccessList } from '../../@types/AccessList.js'
import { ENVIRONMENT_VARIABLES, PROTOCOL_COMMANDS } from '../../utils/constants.js'
import { getConfiguration } from '../../utils/config.js'
import { streamToObject } from '../../utils/util.js'
import { streamToObject, streamToString } from '../../utils/util.js'
import {
DEFAULT_TEST_TIMEOUT,
OverrideEnvConfig,
Expand All @@ -35,7 +36,7 @@ import { Blockchain } from '../../utils/blockchain.js'
import { RPCS, SupportedNetwork } from '../../@types/blockchain.js'
import { DEVELOPMENT_CHAIN_ID } from '../../utils/address.js'
import { deployAndGetAccessListConfig } from '../utils/contracts.js'
import { OceanNodeConfig } from '../../@types/OceanNode.js'
import { OceanNodeConfig, OceanNodeStatus } from '../../@types/OceanNode.js'
import { KeyManager } from '../../components/KeyManager/index.js'

describe('Persistent storage handlers (integration)', function () {
Expand Down Expand Up @@ -119,6 +120,19 @@ describe('Persistent storage handlers (integration)', function () {
// await fsp.rm(psRoot, { recursive: true, force: true })
})

it('should expose persistent storage access lists on node status', async () => {
const statusCommand = {
command: PROTOCOL_COMMANDS.STATUS,
node: oceanNode.getKeyManager().getPeerId().toString()
}
const response = await new StatusHandler(oceanNode).handle(statusCommand)
expect(response.status.httpStatus).to.equal(200)
const body = await streamToString(response.stream as Readable)
const nodeStatus = JSON.parse(body) as OceanNodeStatus
expect(nodeStatus.persistentStorage).to.be.an('object')
expect(nodeStatus.persistentStorage?.accessLists).to.be.an('array').with.lengthOf(1)
})

it('create bucket → upload → list → delete (happy path)', async () => {
const consumerAddress = await consumer.getAddress()
let nonce = Date.now().toString()
Expand Down
3 changes: 2 additions & 1 deletion src/utils/config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export const ENV_TO_CONFIG_MAPPING = {
P2P_ENABLE_NETWORK_STATS: 'p2pConfig.enableNetworkStats',
HTTP_CERT_PATH: 'httpCertPath',
HTTP_KEY_PATH: 'httpKeyPath',
ENABLE_BENCHMARK: 'enableBenchmark'
ENABLE_BENCHMARK: 'enableBenchmark',
PERSISTENT_STORAGE: 'persistentStorage'
} as const

// Configuration defaults
Expand Down
36 changes: 35 additions & 1 deletion src/utils/config/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,41 @@ export const OceanNodeConfigSchema = z
DB_PASSWORD: z.string().optional(),
DB_TYPE: z.string().optional(),
dbConfig: OceanNodeDBConfigSchema.optional(),
persistentStorage: PersistentStorageConfigSchema.optional(),
// Accept either an object (config file) or a JSON string (env var `PERSISTENT_STORAGE`),
// and validate the parsed value against the PersistentStorage schema.
persistentStorage: z
.preprocess((val) => {
if (val === undefined || val === null) return val
if (typeof val === 'string') {
const tryParse = (s: string) => {
try {
return JSON.parse(s)
} catch {
return undefined
}
}

// 1) Normal JSON string
const parsed = tryParse(val)
if (parsed !== undefined) {
// 2) Handle double-encoded JSON (e.g. "\"{...}\"")
if (typeof parsed === 'string') {
const parsedTwice = tryParse(parsed)
if (parsedTwice !== undefined) return parsedTwice
}
return parsed
}

// 3) Common docker-compose/shell mistake: single quotes inside JSON
const normalized = val.replace(/'/g, '"')
const parsedNormalized = tryParse(normalized)
if (parsedNormalized !== undefined) return parsedNormalized

return val
}
return val
}, PersistentStorageConfigSchema)
.optional(),

FEE_AMOUNT: z.string().optional(),
FEE_TOKENS: z.string().optional(),
Expand Down
5 changes: 5 additions & 0 deletions src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ export const ENVIRONMENT_VARIABLES: Record<any, EnvVariable> = {
name: 'HTTP_KEY_PATH',
value: process.env.HTTP_KEY_PATH,
required: false
},
PERSISTENT_STORAGE: {
name: 'PERSISTENT_STORAGE',
value: process.env.PERSISTENT_STORAGE,
required: false
}
}
export const CONNECTION_HISTORY_DELETE_THRESHOLD = 300
Expand Down
Loading