Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions packages/d2mini/src/indexes.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
import { MultiSet } from './multiset.js'
import { DefaultMap, hash } from './utils.js'
import { DefaultMap } from './utils.js'

/**
* A map from a difference collection trace's keys -> (value, multiplicities) that changed.
* Used in operations like join and reduce where the operation needs to
* exploit the key-value structure of the data to run efficiently.
*/
export class Index<K, V> {
#inner: DefaultMap<K, DefaultMap<string, [V, number]>>
#inner: DefaultMap<K, Map<V, number>>

constructor() {
this.#inner = new DefaultMap<K, DefaultMap<string, [V, number]>>(
() =>
new DefaultMap<string, [V, number]>(() => [undefined as any as V, 0]),
)
// #inner is as map of:
this.#inner = new DefaultMap<K, Map<V, number>>(() => new Map<V, number>())
// #inner is a map of:
// {
// [key]: {
// [hash(value)]: [value, multiplicity]
// }
// [key]: Map<V, number> // Direct value-to-multiplicity mapping
// }
}

Expand All @@ -32,14 +27,12 @@ export class Index<K, V> {

get(key: K): [V, number][] {
const valueMap = this.#inner.get(key)
return [...valueMap.values()]
return [...valueMap.entries()]
}

getMultiplicity(key: K, value: V): number {
const valueMap = this.#inner.get(key)
const valueHash = hash(value)
const [, multiplicity] = valueMap.get(valueHash)
return multiplicity
return valueMap.get(value) ?? 0
}

entries() {
Expand All @@ -61,31 +54,28 @@ export class Index<K, V> {
addValue(key: K, value: [V, number]): void {
const [val, multiplicity] = value
const valueMap = this.#inner.get(key)
const valueHash = hash(val)
const [, existingMultiplicity] = valueMap.get(valueHash)
const existingMultiplicity = valueMap.get(val) ?? 0
const newMultiplicity = existingMultiplicity + multiplicity

if (multiplicity !== 0) {
if (newMultiplicity === 0) {
valueMap.delete(valueHash)
valueMap.delete(val)
} else {
valueMap.set(valueHash, [val, newMultiplicity])
valueMap.set(val, newMultiplicity)
}
}
}

append(other: Index<K, V>): void {
for (const [key, otherValueMap] of other.entries()) {
const thisValueMap = this.#inner.get(key)
for (const [
valueHash,
[value, multiplicity],
] of otherValueMap.entries()) {
const [, existingMultiplicity] = thisValueMap.get(valueHash)
for (const [value, multiplicity] of otherValueMap.entries()) {
const existingMultiplicity = thisValueMap.get(value) ?? 0
const newMultiplicity = existingMultiplicity + multiplicity
if (newMultiplicity === 0) {
thisValueMap.delete(valueHash)
thisValueMap.delete(value)
} else {
thisValueMap.set(valueHash, [value, newMultiplicity])
thisValueMap.set(value, newMultiplicity)
}
}
}
Expand All @@ -100,7 +90,7 @@ export class Index<K, V> {
for (const [key, valueMap] of this.entries()) {
if (!other.has(key)) continue
const otherValues = other.get(key)
for (const [val1, mul1] of valueMap.values()) {
for (const [val1, mul1] of valueMap.entries()) {
for (const [val2, mul2] of otherValues) {
if (mul1 !== 0 && mul2 !== 0) {
result.push([[key, [val1, val2]], mul1 * mul2])
Expand All @@ -112,7 +102,7 @@ export class Index<K, V> {
for (const [key, otherValueMap] of other.entries()) {
if (!this.has(key)) continue
const values = this.get(key)
for (const [val2, mul2] of otherValueMap.values()) {
for (const [val2, mul2] of otherValueMap.entries()) {
for (const [val1, mul1] of values) {
if (mul1 !== 0 && mul2 !== 0) {
result.push([[key, [val1, val2]], mul1 * mul2])
Expand Down
102 changes: 101 additions & 1 deletion packages/d2mini/src/multiset.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { DefaultMap, chunkedArrayPush, hash } from './utils.js'
import {
DefaultMap,
chunkedArrayPush,
hash,
globalObjectIdGenerator,
} from './utils.js'

export type MultiSetArray<T> = [T, number][]
export type KeyedData<T> = [key: string, value: T]
Expand Down Expand Up @@ -66,6 +71,101 @@ export class MultiSet<T> {
* (record, multiplicity) pair.
*/
consolidate(): MultiSet<T> {
// Check if this looks like a keyed multiset (first item is a tuple of length 2)
if (this.#inner.length > 0) {
const firstItem = this.#inner[0][0]
if (Array.isArray(firstItem) && firstItem.length === 2) {
return this.#consolidateKeyed()
}
}

// Fall back to original method for unkeyed data
return this.#consolidateUnkeyed()
}

/**
* Private method for consolidating keyed multisets where keys are strings/numbers
* and values are compared by reference equality.
*
* This method provides significant performance improvements over the hash-based approach
* by using WeakMap for object reference tracking and avoiding expensive serialization.
*
* Special handling for join operations: When values are tuples of length 2 (common in joins),
* we unpack them and compare each element individually to maintain proper equality semantics.
*/
#consolidateKeyed(): MultiSet<T> {
const consolidated = new Map<string, number>()
const values = new Map<string, T>()

// Use global object ID generator for consistent reference equality

/**
* Special handler for tuples (arrays of length 2) commonly produced by join operations.
* Unpacks the tuple and generates an ID based on both elements to ensure proper
* consolidation of join results like ['A', null] and [null, 'X'].
*/
const getTupleId = (tuple: any[]): string => {
if (tuple.length !== 2) {
throw new Error('Expected tuple of length 2')
}
const [first, second] = tuple
return `${globalObjectIdGenerator.getStringId(first)}|${globalObjectIdGenerator.getStringId(second)}`
}

// Process each item in the multiset
for (const [data, multiplicity] of this.#inner) {
// Verify this is still a keyed item (should be [key, value] pair)
if (!Array.isArray(data) || data.length !== 2) {
// Found non-keyed item, fall back to unkeyed consolidation
return this.#consolidateUnkeyed()
}

const [key, value] = data

// Verify key is string or number as expected for keyed multisets
if (typeof key !== 'string' && typeof key !== 'number') {
// Found non-string/number key, fall back to unkeyed consolidation
return this.#consolidateUnkeyed()
}

// Generate value ID with special handling for join tuples
let valueId: string
if (Array.isArray(value) && value.length === 2) {
// Special case: value is a tuple from join operations
valueId = getTupleId(value)
} else {
// Regular case: use reference/value equality
valueId = globalObjectIdGenerator.getStringId(value)
}

// Create composite key and consolidate
const compositeKey = key + '|' + valueId
consolidated.set(
compositeKey,
(consolidated.get(compositeKey) || 0) + multiplicity,
)

// Store the original data for the first occurrence
if (!values.has(compositeKey)) {
values.set(compositeKey, data as T)
}
}

// Build result array, filtering out zero multiplicities
const result: MultiSetArray<T> = []
for (const [compositeKey, multiplicity] of consolidated) {
if (multiplicity !== 0) {
result.push([values.get(compositeKey)!, multiplicity])
}
}

return new MultiSet(result)
}

/**
* Private method for consolidating unkeyed multisets using the original approach.
*/
#consolidateUnkeyed(): MultiSet<T> {
const consolidated = new DefaultMap<string | number, number>(() => 0)
const values = new Map<string, any>()

Expand Down
64 changes: 21 additions & 43 deletions packages/d2mini/src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
import { StreamBuilder } from '../d2.js'
import { MultiSet } from '../multiset.js'
import { Index } from '../indexes.js'
import { hash } from '../utils.js'

/**
* Base operator for reduction operations (version-free)
Expand Down Expand Up @@ -45,73 +44,52 @@ export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
const currOut = this.#indexOut.get(key)
const out = this.#f(curr)

// Create maps for current and previous outputs
const newOutputMap = new Map<
string,
{ value: V2; multiplicity: number }
>()
const oldOutputMap = new Map<
string,
{ value: V2; multiplicity: number }
>()
// Create maps for current and previous outputs using values directly as keys
const newOutputMap = new Map<V2, number>()
const oldOutputMap = new Map<V2, number>()

// Process new output
for (const [value, multiplicity] of out) {
const valueKey = hash(value)
if (newOutputMap.has(valueKey)) {
newOutputMap.get(valueKey)!.multiplicity += multiplicity
} else {
newOutputMap.set(valueKey, { value, multiplicity })
}
const existing = newOutputMap.get(value) ?? 0
newOutputMap.set(value, existing + multiplicity)
}

// Process previous output
for (const [value, multiplicity] of currOut) {
const valueKey = hash(value)
if (oldOutputMap.has(valueKey)) {
oldOutputMap.get(valueKey)!.multiplicity += multiplicity
} else {
oldOutputMap.set(valueKey, { value, multiplicity })
}
const existing = oldOutputMap.get(value) ?? 0
oldOutputMap.set(value, existing + multiplicity)
}

const commonKeys = new Set<string>()

// First, emit removals for old values that are no longer present
for (const [valueKey, { value, multiplicity }] of oldOutputMap) {
const newEntry = newOutputMap.get(valueKey)
if (!newEntry) {
for (const [value, multiplicity] of oldOutputMap) {
if (!newOutputMap.has(value)) {
// Remove the old value entirely
result.push([[key, value], -multiplicity])
this.#indexOut.addValue(key, [value, -multiplicity])
} else {
commonKeys.add(valueKey)
}
}

// Then, emit additions for new values that are not present in old
for (const [valueKey, { value, multiplicity }] of newOutputMap) {
const oldEntry = oldOutputMap.get(valueKey)
if (!oldEntry) {
for (const [value, multiplicity] of newOutputMap) {
if (!oldOutputMap.has(value)) {
// Add the new value only if it has non-zero multiplicity
if (multiplicity !== 0) {
result.push([[key, value], multiplicity])
this.#indexOut.addValue(key, [value, multiplicity])
}
} else {
commonKeys.add(valueKey)
}
}

// Then, emit multiplicity changes for values that were present and are still present
for (const valueKey of commonKeys) {
const newEntry = newOutputMap.get(valueKey)
const oldEntry = oldOutputMap.get(valueKey)
const delta = newEntry!.multiplicity - oldEntry!.multiplicity
// Only emit actual changes, i.e. non-zero deltas
if (delta !== 0) {
result.push([[key, newEntry!.value], delta])
this.#indexOut.addValue(key, [newEntry!.value, delta])
// Finally, emit multiplicity changes for values that were present and are still present
for (const [value, newMultiplicity] of newOutputMap) {
const oldMultiplicity = oldOutputMap.get(value)
if (oldMultiplicity !== undefined) {
const delta = newMultiplicity - oldMultiplicity
// Only emit actual changes, i.e. non-zero deltas
if (delta !== 0) {
result.push([[key, value], delta])
this.#indexOut.addValue(key, [value, delta])
}
}
}
}
Expand Down
Loading