11import { EOL } from "os" ;
22import * as sdk from "@hasura/ndc-sdk-typescript"
3+ import { withActiveSpan } from "@hasura/ndc-sdk-typescript/instrumentation"
34import opentelemetry , { SpanStatusCode } from '@opentelemetry/api' ;
45import pLimit from "p-limit" ;
56import * as schema from "./schema"
@@ -15,6 +16,9 @@ export type RuntimeFunctions = {
1516// parallelism going on within a single query
1617const DEFAULT_PARALLEL_DEGREE = 10 ;
1718
19+ const FUNCTION_NAME_SPAN_ATTR_NAME = "ndc-lambda-sdk.function_name" ;
20+ const FUNCTION_INVOCATION_INDEX_SPAN_ATTR_NAME = "ndc-lambda-sdk.function_invocation_index" ;
21+
1822export async function executeQuery ( queryRequest : sdk . QueryRequest , functionsSchema : schema . FunctionsSchema , runtimeFunctions : RuntimeFunctions ) : Promise < sdk . QueryResponse > {
1923 const functionName = queryRequest . collection ;
2024
@@ -29,23 +33,36 @@ export async function executeQuery(queryRequest: sdk.QueryRequest, functionsSche
2933 if ( runtimeFunction === undefined )
3034 throw new sdk . InternalServerError ( `Couldn't find '${ functionName } ' function exported from hosted functions module.` )
3135
32- const functionInvocationPreparedArgs = ( queryRequest . variables ?? [ { } ] ) . map ( variables => {
33- const resolvedArgs = resolveArgumentValues ( queryRequest . arguments , variables )
34- return prepareArguments ( resolvedArgs , functionDefinition , functionsSchema . objectTypes ) ;
35- } ) ;
36+ const spanAttributes = { [ FUNCTION_NAME_SPAN_ATTR_NAME ] : functionName } ;
37+
38+ const functionInvocationPreparedArgs = withActiveSpan ( tracer , "prepare arguments" , ( ) =>
39+ ( queryRequest . variables ?? [ { } ] ) . map ( variables => {
40+ const resolvedArgs = resolveArgumentValues ( queryRequest . arguments , variables ) ;
41+ return prepareArguments ( resolvedArgs , functionDefinition , functionsSchema . objectTypes ) ;
42+ } )
43+ , spanAttributes ) ;
3644
3745 const parallelLimit = pLimit ( functionDefinition . parallelDegree ?? DEFAULT_PARALLEL_DEGREE ) ;
38- const functionInvocations : Promise < sdk . RowSet > [ ] = functionInvocationPreparedArgs . map ( invocationPreparedArgs => parallelLimit ( async ( ) => {
39- const result = await invokeFunction ( runtimeFunction , invocationPreparedArgs , functionName ) ;
40- const prunedResult = reshapeResultToNdcResponseValue ( result , functionDefinition . resultType , [ ] , queryRequest . query . fields ?? { } , functionsSchema . objectTypes ) ;
41- return {
42- aggregates : { } ,
43- rows : [
44- {
45- __value : prunedResult
46- }
47- ]
48- } ;
46+ const functionInvocations : Promise < sdk . RowSet > [ ] = functionInvocationPreparedArgs . map ( ( invocationPreparedArgs , invocationIndex ) => parallelLimit ( async ( ) => {
47+ const invocationSpanAttrs = { ...spanAttributes , [ FUNCTION_INVOCATION_INDEX_SPAN_ATTR_NAME ] : invocationIndex } ;
48+
49+ return withActiveSpan ( tracer , "function invocation" , async ( ) => {
50+ const result = await invokeFunction ( runtimeFunction , invocationPreparedArgs , functionName ) ;
51+
52+ const prunedResult = withActiveSpan ( tracer , "reshape result" , ( ) =>
53+ reshapeResultToNdcResponseValue ( result , functionDefinition . resultType , [ ] , queryRequest . query . fields ?? { } , functionsSchema . objectTypes )
54+ , invocationSpanAttrs ) ;
55+
56+ return {
57+ aggregates : { } ,
58+ rows : [
59+ {
60+ __value : prunedResult
61+ }
62+ ]
63+ } ;
64+ } , invocationSpanAttrs ) ;
65+
4966 } ) ) ;
5067
5168 return await Promise . all ( functionInvocations ) ;
@@ -74,13 +91,21 @@ async function executeMutationOperation(mutationOperation: sdk.MutationOperation
7491 throw new sdk . BadRequest ( `'${ functionName } ' is a '${ functionDefinition . ndcKind } ' and cannot be queried as a ${ schema . FunctionNdcKind . Procedure } .` )
7592 }
7693
94+ const spanAttributes = { [ FUNCTION_NAME_SPAN_ATTR_NAME ] : functionName } ;
95+
7796 const runtimeFunction = runtimeFunctions [ functionName ] ;
7897 if ( runtimeFunction === undefined )
7998 throw new sdk . InternalServerError ( `Couldn't find ${ functionName } function exported from hosted functions module.` )
8099
81- const preparedArgs = prepareArguments ( mutationOperation . arguments , functionDefinition , functionsSchema . objectTypes ) ;
100+ const preparedArgs = withActiveSpan ( tracer , "prepare arguments" , ( ) =>
101+ prepareArguments ( mutationOperation . arguments , functionDefinition , functionsSchema . objectTypes )
102+ , spanAttributes ) ;
103+
82104 const result = await invokeFunction ( runtimeFunction , preparedArgs , functionName ) ;
83- const prunedResult = reshapeResultToNdcResponseValue ( result , functionDefinition . resultType , [ ] , mutationOperation . fields ?? { } , functionsSchema . objectTypes ) ;
105+
106+ const prunedResult = withActiveSpan ( tracer , "reshape result" , ( ) =>
107+ reshapeResultToNdcResponseValue ( result , functionDefinition . resultType , [ ] , mutationOperation . fields ?? { } , functionsSchema . objectTypes )
108+ , spanAttributes ) ;
84109
85110 return {
86111 affected_rows : 1 ,
@@ -153,36 +178,26 @@ function coerceArgumentValue(value: unknown, type: schema.TypeReference, valuePa
153178}
154179
155180async function invokeFunction ( func : Function , preparedArgs : unknown [ ] , functionName : string ) : Promise < unknown > {
156- return tracer . startActiveSpan ( `Function: ${ functionName } ` , async ( span ) => {
157- span . setAttribute ( "ndc-lambda-sdk.function_name" , functionName ) ;
158- try {
181+ try {
182+ return await withActiveSpan ( tracer , `Function: ${ functionName } ` , async ( ) => {
159183 const result = func . apply ( undefined , preparedArgs ) ;
160184 // Await the result if it is a promise
161185 if ( typeof result === "object" && 'then' in result && typeof result . then === "function" ) {
162186 return await result ;
163187 }
164188 return result ;
165- } catch ( e ) {
166- if ( e instanceof sdk . ConnectorError ) {
167- span . recordException ( e ) ;
168- span . setStatus ( { code : SpanStatusCode . ERROR } ) ;
169- throw e ;
170- } else if ( e instanceof Error ) {
171- span . recordException ( e ) ;
172- span . setStatus ( { code : SpanStatusCode . ERROR } ) ;
173- throw new sdk . InternalServerError ( `Error encountered when invoking function '${ functionName } '` , getErrorDetails ( e ) ) ;
174- } else if ( typeof e === "string" ) {
175- span . recordException ( e ) ;
176- span . setStatus ( { code : SpanStatusCode . ERROR } ) ;
177- throw new sdk . InternalServerError ( `Error encountered when invoking function '${ functionName } '` , { message : e } ) ;
178- } else {
179- throw new sdk . InternalServerError ( `Error encountered when invoking function '${ functionName } '` ) ;
180- }
181- }
182- finally {
183- span . end ( ) ;
189+ } , { [ FUNCTION_NAME_SPAN_ATTR_NAME ] : functionName } ) ;
190+ } catch ( e ) {
191+ if ( e instanceof sdk . ConnectorError ) {
192+ throw e ;
193+ } else if ( e instanceof Error ) {
194+ throw new sdk . InternalServerError ( `Error encountered when invoking function '${ functionName } '` , getErrorDetails ( e ) ) ;
195+ } else if ( typeof e === "string" ) {
196+ throw new sdk . InternalServerError ( `Error encountered when invoking function '${ functionName } '` , { message : e } ) ;
197+ } else {
198+ throw new sdk . InternalServerError ( `Error encountered when invoking function '${ functionName } '` ) ;
184199 }
185- } )
200+ }
186201}
187202
188203export type ErrorDetails = {
0 commit comments