From 42c50a95f5edee2c393b0b73d721bc3066f88946 Mon Sep 17 00:00:00 2001 From: Jack Frain Date: Mon, 10 Nov 2025 10:38:54 -0500 Subject: [PATCH 1/3] fix(cu): call block gateway less --- servers/cu/src/domain/dal.js | 3 +++ servers/cu/src/domain/lib/loadMessages.js | 32 ++++++++++++++++++----- servers/cu/src/effects/ao-block.js | 28 +++++++++++++++----- servers/cu/src/effects/ao-process.js | 2 ++ servers/cu/src/effects/arweave.js | 8 ++++-- servers/cu/src/effects/main.cu.js | 24 ++++++++++++----- 6 files changed, 76 insertions(+), 21 deletions(-) diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index 73133ff98..3536df03f 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -48,6 +48,9 @@ export const loadBlocksMetaSchema = z.function() z.array(blockSchema.passthrough()) )) +export const getLatestBlockSchema = z.function() + .returns(z.promise(z.number())) + // Process export const findProcessSchema = z.function() diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index e088b1c53..e91794f15 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -1,11 +1,11 @@ import { Transform } from 'node:stream' -import { Resolved, fromPromise, of } from 'hyper-async' +import { Rejected, Resolved, fromPromise, of } from 'hyper-async' import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, pipe, prop, reduce, uniqBy } from 'ramda' import ms from 'ms' import { mapFrom, parseTags } from '../utils.js' -import { findBlocksSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js' +import { findBlocksSchema, getLatestBlockSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js' export const toSeconds = (millis) => Math.floor(millis / 1000) @@ -329,10 +329,11 @@ export function cronMessagesBetweenWith ({ } } -function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) { +function reconcileBlocksWith ({ logger, loadBlocksMeta, findBlocks, saveBlocks, getLatestBlock }) { findBlocks = fromPromise(findBlocksSchema.implement(findBlocks)) saveBlocks = fromPromise(saveBlocksSchema.implement(saveBlocks)) loadBlocksMeta = fromPromise(loadBlocksMetaSchema.implement(loadBlocksMeta)) + getLatestBlock = fromPromise(getLatestBlockSchema.implement(getLatestBlock)) return ({ min, maxTimestamp }) => { /** @@ -348,6 +349,25 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) { .map((fromDb) => findMissingBlocksIn(fromDb, { min, maxTimestamp })) .chain((missingRange) => { if (!missingRange) return Resolved(fromDb) + const latestBlocksMatch = missingRange.min === fromDb[fromDb.length - 1].height + if (latestBlocksMatch) { + logger('Latest blocks match at height %d. Checking Arweave for latest block', missingRange.min) + return of() + .chain(getLatestBlock) + .chain((latestBlock) => { + if (latestBlock === missingRange.min) { + logger('Latest block matches missing range min height %d. Bypassing GQL call', missingRange.min) + return Resolved(fromDb) + } + logger('Latest blocks do not match (arweave: %d, db: %d). Fetching missing blocks from gateway', latestBlock, missingRange.min) + return Rejected(missingRange) + }) + } + return Rejected(missingRange) + }) + .bichain((missingRange) => { + if (!missingRange) return Resolved(fromDb) + logger('Loading missing blocks within range of %j', missingRange) /** * Load any missing blocks within the determined range, @@ -362,7 +382,7 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) { */ .chain((fromGateway) => saveBlocks(fromGateway).map(() => fromGateway)) .map((fromGateway) => mergeBlocks(fromDb, fromGateway)) - }) + }, Resolved) }) } } @@ -498,10 +518,10 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { ) } -function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) { +function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, getLatestBlock, logger }) { loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp)) - const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks }) + const reconcileBlocks = reconcileBlocksWith({ logger, findBlocks, loadBlocksMeta, saveBlocks, getLatestBlock }) return (ctx) => of(ctx) .chain(parseCrons) diff --git a/servers/cu/src/effects/ao-block.js b/servers/cu/src/effects/ao-block.js index c9304fb60..eae09cd93 100644 --- a/servers/cu/src/effects/ao-block.js +++ b/servers/cu/src/effects/ao-block.js @@ -5,7 +5,7 @@ import pMap from 'p-map' import CircuitBreaker from 'opossum' import { blockSchema } from '../domain/model.js' -import { backoff, okRes, strFromFetchError } from '../domain/utils.js' +import { backoff, strFromFetchError } from '../domain/utils.js' import { BLOCKS_TABLE } from './db.js' const blockDocSchema = z.object({ @@ -94,6 +94,15 @@ export function findBlocksWith ({ db }) { } } +export function getLatestBlockWith ({ ARWEAVE_URL }) { + return () => { + return of(ARWEAVE_URL) + .chain(fromPromise((url) => fetch(url).then(res => res.json()))) + .map(path(['height'])) + .toPromise() + } +} + /** * @typedef Env2 * @property {fetch} fetch @@ -108,7 +117,7 @@ export function findBlocksWith ({ db }) { * @returns {LoadBlocksMeta} */ export function loadBlocksMetaWith ({ - fetch, GRAPHQL_URLS, pageSize, logger, breakerOptions = { + fetch, GRAPHQL_URLS, pageSize, logger, gatewayCounter, breakerOptions = { timeout: 10000, // 10 seconds timeout errorThresholdPercentage: 50, // open circuit after 50% failures resetTimeout: 15000, // attempt to close circuit after 15 seconds @@ -167,7 +176,14 @@ export function loadBlocksMetaWith ({ }, retry ) - .then(okRes) + .then((res) => { + if (res.ok) { + gatewayCounter.inc(1, { query_name: 'GetBlocks', result: 'success' }) + return res + } + gatewayCounter.inc(1, { query_name: 'GetBlocks', result: 'error' }) + throw res + }) .catch(async (e) => { logger( 'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'', @@ -264,8 +280,8 @@ export function loadBlocksMetaWith ({ return (args) => of(args) - .chain(fromPromise(({ min, maxTimestamp }) => - fetchAllPages({ min, maxTimestamp }) + .chain(fromPromise(({ min, maxTimestamp }) => { + return fetchAllPages({ min, maxTimestamp }) .then(prop('edges')) .then(pluck('node')) .then(map(block => ({ @@ -276,6 +292,6 @@ export function loadBlocksMetaWith ({ */ timestamp: block.timestamp * 1000 }))) - )) + })) .toPromise() } diff --git a/servers/cu/src/effects/ao-process.js b/servers/cu/src/effects/ao-process.js index 32caa4c79..8f1d2f560 100644 --- a/servers/cu/src/effects/ao-process.js +++ b/servers/cu/src/effects/ao-process.js @@ -828,6 +828,7 @@ export function findLatestProcessMemoryWith ({ findFileCheckpointBefore, findRecordCheckpointBefore, address, + gatewayCounter, queryGateway, queryCheckpointGateway, loadTransactionData, @@ -1472,6 +1473,7 @@ export function saveCheckpointWith ({ readProcessMemoryFile, queryCheckpointGateway, queryGateway, + gatewayCounter, hashWasmMemory, buildAndSignDataItem, uploadDataItem, diff --git a/servers/cu/src/effects/arweave.js b/servers/cu/src/effects/arweave.js index 7e3cb002d..3740a461c 100644 --- a/servers/cu/src/effects/arweave.js +++ b/servers/cu/src/effects/arweave.js @@ -50,7 +50,7 @@ export function buildAndSignDataItemWith ({ WALLET, createDataItem = createData * @param {Env1} env * @returns {LoadTransactionMeta} */ -export function loadTransactionMetaWith ({ fetch, GRAPHQL_URL, logger }) { +export function loadTransactionMetaWith ({ gatewayCounter, fetch, GRAPHQL_URL, logger }) { // TODO: create a dataloader and use that to batch load contracts const GET_PROCESSES_QUERY = ` @@ -123,11 +123,15 @@ export function loadTransactionMetaWith ({ fetch, GRAPHQL_URL, logger }) { .then(transactionConnectionSchema.parse) .then(path(['data', 'transactions', 'edges', '0', 'node'])) .then((node) => { - if (node) return node + if (node) { + gatewayCounter.inc(1, { query_name: 'GetTransactionMeta', result: 'success' }) + return node + } logger('Transaction "%s" was not found on gateway', id) // TODO: better error handling const err = new Error(`Transaction '${id}' not found on gateway`) err.status = 404 + gatewayCounter.inc(1, { query_name: 'GetTransactionMeta', result: 'error' }) throw err }) )) diff --git a/servers/cu/src/effects/main.cu.js b/servers/cu/src/effects/main.cu.js index ad554ec15..6292a2097 100644 --- a/servers/cu/src/effects/main.cu.js +++ b/servers/cu/src/effects/main.cu.js @@ -162,6 +162,18 @@ export const createEffects = async (ctx) => { const gauge = MetricsClient.gaugeWith({}) + const evaluationCounter = MetricsClient.counterWith({})({ + name: 'ao_process_total_evaluations', + description: 'The total number of evaluations on a CU', + labelNames: ['stream_type', 'message_type', 'process_error'] + }) + + const gatewayCounter = MetricsClient.counterWith({})({ + name: 'ao_process_total_graphql_queries', + description: 'The total number of GraphQL queries on a CU', + labelNames: ['query_name', 'result'] + }) + const readProcessMemoryFile = AoProcessClient.readProcessMemoryFileWith({ DIR: ctx.PROCESS_MEMORY_CACHE_FILE_DIR, readFile @@ -216,6 +228,7 @@ export const createEffects = async (ctx) => { const saveCheckpoint = AoProcessClient.saveCheckpointWith({ address, readProcessMemoryFile, + gatewayCounter, queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }), queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }), hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }), @@ -281,12 +294,6 @@ export const createEffects = async (ctx) => { const loadMemoryUsage = () => process.memoryUsage() const loadProcessCacheUsage = () => wasmMemoryCache.data.loadProcessCacheUsage() - const evaluationCounter = MetricsClient.counterWith({})({ - name: 'ao_process_total_evaluations', - description: 'The total number of evaluations on a CU', - labelNames: ['stream_type', 'message_type', 'process_error'] - }) - /** * TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens */ @@ -299,7 +306,7 @@ export const createEffects = async (ctx) => { const BLOCK_GRAPHQL_ARRAY = ctx.GRAPHQL_URLS.length > 0 ? ctx.GRAPHQL_URLS : [ctx.GRAPHQL_URL] const common = (logger) => ({ - loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), + loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, gatewayCounter, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }), isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }), findProcess: AoProcessClient.findProcessWith({ db, logger }), @@ -311,6 +318,7 @@ export const createEffects = async (ctx) => { findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }), findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }), address, + gatewayCounter, queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }), PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS, @@ -330,12 +338,14 @@ export const createEffects = async (ctx) => { logger }), evaluationCounter, + gatewayCounter, // gasCounter, saveProcess: AoProcessClient.saveProcessWith({ db, logger }), findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }), saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }), findBlocks: AoBlockClient.findBlocksWith({ db, logger }), saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }), + getLatestBlock: AoBlockClient.getLatestBlockWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }), loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URLS: BLOCK_GRAPHQL_ARRAY, pageSize: 90, logger }), findModule: AoModuleClient.findModuleWith({ db, logger }), saveModule: AoModuleClient.saveModuleWith({ db, logger }), From 84df805a5fb5b4dbc78e91a651971c54b7df4b77 Mon Sep 17 00:00:00 2001 From: Jack Frain Date: Mon, 10 Nov 2025 10:40:38 -0500 Subject: [PATCH 2/3] fix(cu): add optional chaining --- servers/cu/src/domain/lib/loadMessages.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index e91794f15..eaca1fffd 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -349,13 +349,13 @@ function reconcileBlocksWith ({ logger, loadBlocksMeta, findBlocks, saveBlocks, .map((fromDb) => findMissingBlocksIn(fromDb, { min, maxTimestamp })) .chain((missingRange) => { if (!missingRange) return Resolved(fromDb) - const latestBlocksMatch = missingRange.min === fromDb[fromDb.length - 1].height + const latestBlocksMatch = missingRange?.min === fromDb?.[fromDb?.length - 1]?.height if (latestBlocksMatch) { logger('Latest blocks match at height %d. Checking Arweave for latest block', missingRange.min) return of() .chain(getLatestBlock) .chain((latestBlock) => { - if (latestBlock === missingRange.min) { + if (latestBlock === missingRange?.min) { logger('Latest block matches missing range min height %d. Bypassing GQL call', missingRange.min) return Resolved(fromDb) } From 5431d2b3369e50d45ba74c09b20424f821288cab Mon Sep 17 00:00:00 2001 From: Jack Frain Date: Mon, 10 Nov 2025 15:36:03 -0500 Subject: [PATCH 3/3] fix(cu): hb imports --- servers/cu/src/effects/main.cu.js | 1 + servers/cu/src/effects/main.hb.js | 25 ++++++++++++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/servers/cu/src/effects/main.cu.js b/servers/cu/src/effects/main.cu.js index 6292a2097..e3ec6c52b 100644 --- a/servers/cu/src/effects/main.cu.js +++ b/servers/cu/src/effects/main.cu.js @@ -376,6 +376,7 @@ export const createEffects = async (ctx) => { loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }), loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }), loadMessages: AoSuClient.loadMessagesWith({ + gatewayCounter, hashChain: (...args) => hashChainWorker.exec('hashChain', args), fetch: ctx.fetch, pageSize: 1000, diff --git a/servers/cu/src/effects/main.hb.js b/servers/cu/src/effects/main.hb.js index 51d116532..ceae20ced 100644 --- a/servers/cu/src/effects/main.hb.js +++ b/servers/cu/src/effects/main.hb.js @@ -139,6 +139,18 @@ export const createEffects = async (ctx) => { const gauge = MetricsClient.gaugeWith({}) + const evaluationCounter = MetricsClient.counterWith({})({ + name: 'ao_process_total_evaluations', + description: 'The total number of evaluations on a CU', + labelNames: ['stream_type', 'message_type', 'process_error'] + }) + + const gatewayCounter = MetricsClient.counterWith({})({ + name: 'ao_process_total_graphql_queries', + description: 'The total number of GraphQL queries on a CU', + labelNames: ['query_name', 'result'] + }) + const readProcessMemoryFile = AoProcessClient.readProcessMemoryFileWith({ DIR: ctx.PROCESS_MEMORY_CACHE_FILE_DIR, readFile @@ -193,6 +205,7 @@ export const createEffects = async (ctx) => { const saveCheckpoint = AoProcessClient.saveCheckpointWith({ address, readProcessMemoryFile, + gatewayCounter, queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }), queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }), hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }), @@ -258,12 +271,6 @@ export const createEffects = async (ctx) => { const loadMemoryUsage = () => process.memoryUsage() const loadProcessCacheUsage = () => wasmMemoryCache.data.loadProcessCacheUsage() - const evaluationCounter = MetricsClient.counterWith({})({ - name: 'ao_process_total_evaluations', - description: 'The total number of evaluations on a CU', - labelNames: ['stream_type', 'message_type', 'process_error'] - }) - /** * TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens */ @@ -276,7 +283,7 @@ export const createEffects = async (ctx) => { const BLOCK_GRAPHQL_ARRAY = ctx.GRAPHQL_URLS.length > 0 ? ctx.GRAPHQL_URLS : [ctx.GRAPHQL_URL] const common = (logger) => ({ - loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), + loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ gatewayCounter, fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }), isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }), findProcess: AoProcessClient.findProcessWith({ db, logger }), @@ -288,6 +295,7 @@ export const createEffects = async (ctx) => { findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }), findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }), address, + gatewayCounter, queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }), PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS, @@ -307,12 +315,14 @@ export const createEffects = async (ctx) => { logger }), evaluationCounter, + gatewayCounter, // gasCounter, saveProcess: AoProcessClient.saveProcessWith({ db, logger }), findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }), saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }), findBlocks: AoBlockClient.findBlocksWith({ db, logger }), saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }), + getLatestBlock: AoBlockClient.getLatestBlockWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }), loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URLS: BLOCK_GRAPHQL_ARRAY, pageSize: 90, logger }), findModule: AoModuleClient.findModuleWith({ db, logger }), saveModule: AoModuleClient.saveModuleWith({ db, logger }), @@ -343,6 +353,7 @@ export const createEffects = async (ctx) => { loadTimestamp: HbClient.loadTimestampWith({ fetch: ctx.fetch, logger }), loadProcess: HbClient.loadProcessWith({ fetch: ctx.fetch, logger }), loadMessages: HbClient.loadMessagesWith({ + gatewayCounter, hashChain: (...args) => hashChainWorker.exec('hashChain', args), fetch: ctx.fetch, pageSize: 1000,