Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ export const loadBlocksMetaSchema = z.function()
z.array(blockSchema.passthrough())
))

export const getLatestBlockSchema = z.function()
.returns(z.promise(z.number()))

// Process

export const findProcessSchema = z.function()
Expand Down
32 changes: 26 additions & 6 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Transform } from 'node:stream'

import { Resolved, fromPromise, of } from 'hyper-async'
import { Rejected, Resolved, fromPromise, of } from 'hyper-async'
import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, pipe, prop, reduce, uniqBy } from 'ramda'
import ms from 'ms'

import { mapFrom, parseTags } from '../utils.js'
import { findBlocksSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js'
import { findBlocksSchema, getLatestBlockSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js'

export const toSeconds = (millis) => Math.floor(millis / 1000)

Expand Down Expand Up @@ -329,10 +329,11 @@ export function cronMessagesBetweenWith ({
}
}

function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
function reconcileBlocksWith ({ logger, loadBlocksMeta, findBlocks, saveBlocks, getLatestBlock }) {
findBlocks = fromPromise(findBlocksSchema.implement(findBlocks))
saveBlocks = fromPromise(saveBlocksSchema.implement(saveBlocks))
loadBlocksMeta = fromPromise(loadBlocksMetaSchema.implement(loadBlocksMeta))
getLatestBlock = fromPromise(getLatestBlockSchema.implement(getLatestBlock))

return ({ min, maxTimestamp }) => {
/**
Expand All @@ -348,6 +349,25 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
.map((fromDb) => findMissingBlocksIn(fromDb, { min, maxTimestamp }))
.chain((missingRange) => {
if (!missingRange) return Resolved(fromDb)
const latestBlocksMatch = missingRange?.min === fromDb?.[fromDb?.length - 1]?.height
if (latestBlocksMatch) {
logger('Latest blocks match at height %d. Checking Arweave for latest block', missingRange.min)
return of()
.chain(getLatestBlock)
.chain((latestBlock) => {
if (latestBlock === missingRange?.min) {
logger('Latest block matches missing range min height %d. Bypassing GQL call', missingRange.min)
return Resolved(fromDb)
}
logger('Latest blocks do not match (arweave: %d, db: %d). Fetching missing blocks from gateway', latestBlock, missingRange.min)
return Rejected(missingRange)
})
}
return Rejected(missingRange)
})
.bichain((missingRange) => {
if (!missingRange) return Resolved(fromDb)
logger('Loading missing blocks within range of %j', missingRange)

/**
* Load any missing blocks within the determined range,
Expand All @@ -362,7 +382,7 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
*/
.chain((fromGateway) => saveBlocks(fromGateway).map(() => fromGateway))
.map((fromGateway) => mergeBlocks(fromDb, fromGateway))
})
}, Resolved)
})
}
}
Expand Down Expand Up @@ -498,10 +518,10 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) {
)
}

function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) {
function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, getLatestBlock, logger }) {
loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp))

const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks })
const reconcileBlocks = reconcileBlocksWith({ logger, findBlocks, loadBlocksMeta, saveBlocks, getLatestBlock })

return (ctx) => of(ctx)
.chain(parseCrons)
Expand Down
28 changes: 22 additions & 6 deletions servers/cu/src/effects/ao-block.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import pMap from 'p-map'
import CircuitBreaker from 'opossum'

import { blockSchema } from '../domain/model.js'
import { backoff, okRes, strFromFetchError } from '../domain/utils.js'
import { backoff, strFromFetchError } from '../domain/utils.js'
import { BLOCKS_TABLE } from './db.js'

const blockDocSchema = z.object({
Expand Down Expand Up @@ -94,6 +94,15 @@ export function findBlocksWith ({ db }) {
}
}

export function getLatestBlockWith ({ ARWEAVE_URL }) {
return () => {
return of(ARWEAVE_URL)
.chain(fromPromise((url) => fetch(url).then(res => res.json())))
.map(path(['height']))
.toPromise()
}
}

/**
* @typedef Env2
* @property {fetch} fetch
Expand All @@ -108,7 +117,7 @@ export function findBlocksWith ({ db }) {
* @returns {LoadBlocksMeta}
*/
export function loadBlocksMetaWith ({
fetch, GRAPHQL_URLS, pageSize, logger, breakerOptions = {
fetch, GRAPHQL_URLS, pageSize, logger, gatewayCounter, breakerOptions = {
timeout: 10000, // 10 seconds timeout
errorThresholdPercentage: 50, // open circuit after 50% failures
resetTimeout: 15000, // attempt to close circuit after 15 seconds
Expand Down Expand Up @@ -167,7 +176,14 @@ export function loadBlocksMetaWith ({
},
retry
)
.then(okRes)
.then((res) => {
if (res.ok) {
gatewayCounter.inc(1, { query_name: 'GetBlocks', result: 'success' })
return res
}
gatewayCounter.inc(1, { query_name: 'GetBlocks', result: 'error' })
throw res
})
.catch(async (e) => {
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
Expand Down Expand Up @@ -264,8 +280,8 @@ export function loadBlocksMetaWith ({

return (args) =>
of(args)
.chain(fromPromise(({ min, maxTimestamp }) =>
fetchAllPages({ min, maxTimestamp })
.chain(fromPromise(({ min, maxTimestamp }) => {
return fetchAllPages({ min, maxTimestamp })
.then(prop('edges'))
.then(pluck('node'))
.then(map(block => ({
Expand All @@ -276,6 +292,6 @@ export function loadBlocksMetaWith ({
*/
timestamp: block.timestamp * 1000
})))
))
}))
.toPromise()
}
2 changes: 2 additions & 0 deletions servers/cu/src/effects/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ export function findLatestProcessMemoryWith ({
findFileCheckpointBefore,
findRecordCheckpointBefore,
address,
gatewayCounter,
queryGateway,
queryCheckpointGateway,
loadTransactionData,
Expand Down Expand Up @@ -1472,6 +1473,7 @@ export function saveCheckpointWith ({
readProcessMemoryFile,
queryCheckpointGateway,
queryGateway,
gatewayCounter,
hashWasmMemory,
buildAndSignDataItem,
uploadDataItem,
Expand Down
8 changes: 6 additions & 2 deletions servers/cu/src/effects/arweave.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export function buildAndSignDataItemWith ({ WALLET, createDataItem = createData
* @param {Env1} env
* @returns {LoadTransactionMeta}
*/
export function loadTransactionMetaWith ({ fetch, GRAPHQL_URL, logger }) {
export function loadTransactionMetaWith ({ gatewayCounter, fetch, GRAPHQL_URL, logger }) {
// TODO: create a dataloader and use that to batch load contracts

const GET_PROCESSES_QUERY = `
Expand Down Expand Up @@ -123,11 +123,15 @@ export function loadTransactionMetaWith ({ fetch, GRAPHQL_URL, logger }) {
.then(transactionConnectionSchema.parse)
.then(path(['data', 'transactions', 'edges', '0', 'node']))
.then((node) => {
if (node) return node
if (node) {
gatewayCounter.inc(1, { query_name: 'GetTransactionMeta', result: 'success' })
return node
}
logger('Transaction "%s" was not found on gateway', id)
// TODO: better error handling
const err = new Error(`Transaction '${id}' not found on gateway`)
err.status = 404
gatewayCounter.inc(1, { query_name: 'GetTransactionMeta', result: 'error' })
throw err
})
))
Expand Down
25 changes: 18 additions & 7 deletions servers/cu/src/effects/main.cu.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ export const createEffects = async (ctx) => {

const gauge = MetricsClient.gaugeWith({})

const evaluationCounter = MetricsClient.counterWith({})({
name: 'ao_process_total_evaluations',
description: 'The total number of evaluations on a CU',
labelNames: ['stream_type', 'message_type', 'process_error']
})

const gatewayCounter = MetricsClient.counterWith({})({
name: 'ao_process_total_graphql_queries',
description: 'The total number of GraphQL queries on a CU',
labelNames: ['query_name', 'result']
})

const readProcessMemoryFile = AoProcessClient.readProcessMemoryFileWith({
DIR: ctx.PROCESS_MEMORY_CACHE_FILE_DIR,
readFile
Expand Down Expand Up @@ -216,6 +228,7 @@ export const createEffects = async (ctx) => {
const saveCheckpoint = AoProcessClient.saveCheckpointWith({
address,
readProcessMemoryFile,
gatewayCounter,
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }),
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }),
hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }),
Expand Down Expand Up @@ -281,12 +294,6 @@ export const createEffects = async (ctx) => {
const loadMemoryUsage = () => process.memoryUsage()
const loadProcessCacheUsage = () => wasmMemoryCache.data.loadProcessCacheUsage()

const evaluationCounter = MetricsClient.counterWith({})({
name: 'ao_process_total_evaluations',
description: 'The total number of evaluations on a CU',
labelNames: ['stream_type', 'message_type', 'process_error']
})

/**
* TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens
*/
Expand All @@ -299,7 +306,7 @@ export const createEffects = async (ctx) => {
const BLOCK_GRAPHQL_ARRAY = ctx.GRAPHQL_URLS.length > 0 ? ctx.GRAPHQL_URLS : [ctx.GRAPHQL_URL]

const common = (logger) => ({
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, gatewayCounter, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }),
isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }),
findProcess: AoProcessClient.findProcessWith({ db, logger }),
Expand All @@ -311,6 +318,7 @@ export const createEffects = async (ctx) => {
findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }),
findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }),
address,
gatewayCounter,
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }),
PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS,
Expand All @@ -330,12 +338,14 @@ export const createEffects = async (ctx) => {
logger
}),
evaluationCounter,
gatewayCounter,
// gasCounter,
saveProcess: AoProcessClient.saveProcessWith({ db, logger }),
findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }),
findBlocks: AoBlockClient.findBlocksWith({ db, logger }),
saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }),
getLatestBlock: AoBlockClient.getLatestBlockWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }),
loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URLS: BLOCK_GRAPHQL_ARRAY, pageSize: 90, logger }),
findModule: AoModuleClient.findModuleWith({ db, logger }),
saveModule: AoModuleClient.saveModuleWith({ db, logger }),
Expand Down Expand Up @@ -366,6 +376,7 @@ export const createEffects = async (ctx) => {
loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }),
loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }),
loadMessages: AoSuClient.loadMessagesWith({
gatewayCounter,
hashChain: (...args) => hashChainWorker.exec('hashChain', args),
fetch: ctx.fetch,
pageSize: 1000,
Expand Down
25 changes: 18 additions & 7 deletions servers/cu/src/effects/main.hb.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ export const createEffects = async (ctx) => {

const gauge = MetricsClient.gaugeWith({})

const evaluationCounter = MetricsClient.counterWith({})({
name: 'ao_process_total_evaluations',
description: 'The total number of evaluations on a CU',
labelNames: ['stream_type', 'message_type', 'process_error']
})

const gatewayCounter = MetricsClient.counterWith({})({
name: 'ao_process_total_graphql_queries',
description: 'The total number of GraphQL queries on a CU',
labelNames: ['query_name', 'result']
})

const readProcessMemoryFile = AoProcessClient.readProcessMemoryFileWith({
DIR: ctx.PROCESS_MEMORY_CACHE_FILE_DIR,
readFile
Expand Down Expand Up @@ -193,6 +205,7 @@ export const createEffects = async (ctx) => {
const saveCheckpoint = AoProcessClient.saveCheckpointWith({
address,
readProcessMemoryFile,
gatewayCounter,
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }),
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }),
hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }),
Expand Down Expand Up @@ -258,12 +271,6 @@ export const createEffects = async (ctx) => {
const loadMemoryUsage = () => process.memoryUsage()
const loadProcessCacheUsage = () => wasmMemoryCache.data.loadProcessCacheUsage()

const evaluationCounter = MetricsClient.counterWith({})({
name: 'ao_process_total_evaluations',
description: 'The total number of evaluations on a CU',
labelNames: ['stream_type', 'message_type', 'process_error']
})

/**
* TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens
*/
Expand All @@ -276,7 +283,7 @@ export const createEffects = async (ctx) => {
const BLOCK_GRAPHQL_ARRAY = ctx.GRAPHQL_URLS.length > 0 ? ctx.GRAPHQL_URLS : [ctx.GRAPHQL_URL]

const common = (logger) => ({
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ gatewayCounter, fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }),
isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }),
findProcess: AoProcessClient.findProcessWith({ db, logger }),
Expand All @@ -288,6 +295,7 @@ export const createEffects = async (ctx) => {
findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }),
findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }),
address,
gatewayCounter,
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }),
PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS,
Expand All @@ -307,12 +315,14 @@ export const createEffects = async (ctx) => {
logger
}),
evaluationCounter,
gatewayCounter,
// gasCounter,
saveProcess: AoProcessClient.saveProcessWith({ db, logger }),
findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }),
findBlocks: AoBlockClient.findBlocksWith({ db, logger }),
saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }),
getLatestBlock: AoBlockClient.getLatestBlockWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }),
loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URLS: BLOCK_GRAPHQL_ARRAY, pageSize: 90, logger }),
findModule: AoModuleClient.findModuleWith({ db, logger }),
saveModule: AoModuleClient.saveModuleWith({ db, logger }),
Expand Down Expand Up @@ -343,6 +353,7 @@ export const createEffects = async (ctx) => {
loadTimestamp: HbClient.loadTimestampWith({ fetch: ctx.fetch, logger }),
loadProcess: HbClient.loadProcessWith({ fetch: ctx.fetch, logger }),
loadMessages: HbClient.loadMessagesWith({
gatewayCounter,
hashChain: (...args) => hashChainWorker.exec('hashChain', args),
fetch: ctx.fetch,
pageSize: 1000,
Expand Down