Skip to content

Commit 7c7e116

Browse files
committed
groupWithin?
1 parent 8e85ce8 commit 7c7e116

File tree

9 files changed

+98
-37
lines changed

9 files changed

+98
-37
lines changed

modules/service/src/main/scala/lucuma/odb/graphql/mapping/SubscriptionMapping.scala

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ package lucuma.odb.graphql
55

66
package mapping
77

8+
import cats.Eq
89
import cats.data.Nested
10+
import cats.effect.kernel.Temporal
911
import cats.syntax.all.*
12+
import fs2.concurrent.Topic
1013
import fs2.Stream
1114
import grackle.Env
1215
import grackle.Query
@@ -34,9 +37,13 @@ import lucuma.odb.graphql.predicate.Predicates
3437
import lucuma.odb.instances.given
3538
import org.tpolecat.typename.TypeName
3639

40+
import scala.concurrent.duration.*
3741
import scala.reflect.ClassTag
3842

39-
trait SubscriptionMapping[F[_]] extends Predicates[F] {
43+
import SubscriptionMapping.groupingSubscribe
44+
45+
trait SubscriptionMapping[F[_]: Temporal] extends Predicates[F] {
46+
4047

4148
def topics: Topics[F]
4249
def user: User
@@ -89,7 +96,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
8996
SubscriptionField("executionEventAdded", ExecutionEventAddedInput.Binding.Option): (input, child) =>
9097
topics
9198
.executionEvent
92-
.subscribe(1024)
99+
.groupingSubscribe()
93100
.filter: e =>
94101
e.canRead(user) &&
95102
input.flatMap(_.programId).forall(_ === e.programId) &&
@@ -103,7 +110,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
103110
SubscriptionField("programEdit", ProgramEditInput.Binding.Option) { (input, child) =>
104111
topics
105112
.program
106-
.subscribe(1024)
113+
.groupingSubscribe()
107114
.filter(e => e.canRead(user) && input.flatMap(_.programId).forall(_ === e.programId))
108115
.map(e => Result(
109116
Environment(
@@ -117,7 +124,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
117124
SubscriptionField("observationEdit", ObservationEditInput.Binding.Option) { (input, child) =>
118125
topics
119126
.observation
120-
.subscribe(1024)
127+
.groupingSubscribe()
121128
.filter { e =>
122129
e.canRead(user) && ((
123130
input.flatMap(_.programId).forall(_ === e.programId) &&
@@ -154,7 +161,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
154161
SubscriptionField("configurationRequestEdit", ConfigurationRequestEditInput.Binding.Option) { (input, child) =>
155162
topics
156163
.configurationRequest
157-
.subscribe(1024)
164+
.groupingSubscribe()
158165
.filter { e =>
159166
e.canRead(user) && ((
160167
input.flatMap(_.programId).forall(_ === e.programId)
@@ -190,7 +197,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
190197
SubscriptionField("targetEdit", TargetEditInput.Binding.Option) { (input, child) =>
191198
topics
192199
.target
193-
.subscribe(1024)
200+
.groupingSubscribe()
194201
.filter { e =>
195202
e.canRead(user) &&
196203
input.flatMap(_.programId).forall(_ === e.programId) &&
@@ -226,7 +233,7 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
226233
SubscriptionField("groupEdit", GroupEditInput.Binding.Option) { (input, child) =>
227234
topics
228235
.group
229-
.subscribe(1024)
236+
.groupingSubscribe()
230237
.filter { e =>
231238
e.canRead(user) &&
232239
input.flatMap(_.programId).forall(_ === e.programId) &&
@@ -269,3 +276,18 @@ trait SubscriptionMapping[F[_]] extends Predicates[F] {
269276

270277
}
271278

279+
object SubscriptionMapping:
280+
val MaxQueued: Int = 1024
281+
val GroupingChunkSize: Int = 10
282+
val GroupingTimeout: FiniteDuration = 100.millis
283+
284+
extension [F[_]: Temporal, E: Eq](topic: Topic[F, E])
285+
def groupingSubscribe(
286+
maxQueued: Int = MaxQueued,
287+
chunkSize: Int = GroupingChunkSize,
288+
timeout: FiniteDuration = GroupingTimeout
289+
): Stream[F, E] =
290+
topic
291+
.subscribe(maxQueued)
292+
.groupWithin(chunkSize, timeout)
293+
.flatMap(Stream.chunk(_).changes)

modules/service/src/main/scala/lucuma/odb/graphql/topic/ConfigurationRequestTopic.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.Eq
7+
import cats.derived.*
68
import cats.effect.*
79
import cats.effect.std.Supervisor
810
import cats.implicits.*
@@ -23,7 +25,7 @@ object ConfigurationRequestTopic:
2325
programId: Program.Id,
2426
editType: EditType,
2527
users: List[User.Id]
26-
) extends TopicElement
28+
) extends TopicElement derives Eq
2729

2830
private val topic =
2931
OdbTopic.define[(ConfigurationRequest.Id, Program.Id, EditType), Element](

modules/service/src/main/scala/lucuma/odb/graphql/topic/ExecutionEventAddedTopic.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.derived.*
67
import cats.effect.Concurrent
78
import cats.effect.std.Supervisor
89
import cats.syntax.apply.*
@@ -27,7 +28,7 @@ object ExecutionEventAddedTopic:
2728
visitId: Visit.Id,
2829
eventType: ExecutionEventType,
2930
users: List[User.Id]
30-
) extends TopicElement
31+
) extends TopicElement derives cats.Eq
3132

3233
private val topic =
3334
OdbTopic.define[(ExecutionEvent.Id, Program.Id, Observation.Id, Visit.Id, ExecutionEventType), Element](

modules/service/src/main/scala/lucuma/odb/graphql/topic/GroupTopic.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.Eq
7+
import cats.derived.*
68
import cats.effect.*
79
import cats.effect.std.Supervisor
810
import cats.implicits.*
@@ -30,7 +32,7 @@ object GroupTopic:
3032
programId: Program.Id,
3133
editType: EditType,
3234
users: List[User.Id]
33-
) extends TopicElement
35+
) extends TopicElement derives Eq
3436

3537
private val topic =
3638
OdbTopic.define[(Option[Group.Id], Program.Id, EditType), Element](

modules/service/src/main/scala/lucuma/odb/graphql/topic/ObservationTopic.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.Eq
7+
import cats.derived.*
68
import cats.effect.*
79
import cats.effect.std.Supervisor
810
import cats.implicits.*
@@ -29,7 +31,7 @@ object ObservationTopic:
2931
programId: Program.Id,
3032
editType: EditType,
3133
users: List[User.Id]
32-
) extends TopicElement
34+
) extends TopicElement derives Eq
3335

3436
private val topic =
3537
OdbTopic.define[(Observation.Id, Program.Id, EditType), Element](

modules/service/src/main/scala/lucuma/odb/graphql/topic/OdbTopic.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ object OdbTopic:
3737
): F[List[User.Id]] =
3838
s.execute(
3939
sql"""
40-
select c_user_id from t_program_user where c_program_id = '#${pid.toString}' and c_user_id notnull
40+
select c_user_id from t_program_user where c_program_id = '#${pid.toString}' and c_user_id notnull order by c_user_id
4141
""".query(user_id)
4242
)
4343

modules/service/src/main/scala/lucuma/odb/graphql/topic/ProgramTopic.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.Eq
7+
import cats.derived.*
68
import cats.effect.*
79
import cats.effect.std.Supervisor
810
import cats.implicits.*
@@ -24,8 +26,8 @@ object ProgramTopic:
2426
case class Element(
2527
programId: Program.Id,
2628
editType: EditType,
27-
users: List[User.Id],
28-
) extends TopicElement
29+
users: List[User.Id]
30+
) extends TopicElement derives Eq
2931

3032
private val topic =
3133
OdbTopic.define[(Program.Id, EditType), Element](

modules/service/src/main/scala/lucuma/odb/graphql/topic/TargetTopic.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package lucuma.odb.graphql.topic
55

6+
import cats.Eq
7+
import cats.derived.*
68
import cats.effect.*
79
import cats.effect.std.Supervisor
810
import cats.implicits.*
@@ -30,7 +32,7 @@ object TargetTopic:
3032
programId: Program.Id,
3133
editType: EditType,
3234
users: List[User.Id]
33-
) extends TopicElement
35+
) extends TopicElement derives Eq
3436

3537
private val topic =
3638
OdbTopic.define[(Target.Id, Program.Id, EditType), Element](

modules/service/src/test/scala/lucuma/odb/graphql/subscription/observationEditSn.scala

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import lucuma.odb.graphql.query.ExecutionTestSupport
1414
import lucuma.odb.graphql.query.ObservingModeSetupOperations
1515

1616
class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOperations with SubscriptionUtils:
17+
val AcquisitionTotal: Long = 219362500l
18+
val ScienceTotal: Long = 784200000l
1719

1820
def subscriptionQuery(pid: Program.Id) =
1921
s"""
@@ -23,7 +25,26 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
2325
editType
2426
value {
2527
id
28+
itc {
29+
science {
30+
selected {
31+
exposureCount
32+
}
33+
}
34+
}
2635
execution {
36+
digest {
37+
acquisition {
38+
timeEstimate {
39+
total { microseconds }
40+
}
41+
}
42+
science {
43+
timeEstimate {
44+
total { microseconds }
45+
}
46+
}
47+
}
2748
config {
2849
gmosNorth {
2950
science {
@@ -46,7 +67,30 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
4667
"editType" -> Json.fromString(EditType.Updated.tag.toUpperCase),
4768
"value" -> Json.obj(
4869
"id" -> oid.asJson,
70+
"itc" -> Json.obj(
71+
"science" -> Json.obj(
72+
"selected" -> Json.obj(
73+
"exposureCount" -> 6.asJson
74+
)
75+
)
76+
),
4977
"execution" -> Json.obj(
78+
"digest" -> Json.obj(
79+
"acquisition" -> Json.obj(
80+
"timeEstimate" -> Json.obj(
81+
"total" -> Json.obj(
82+
"microseconds" -> AcquisitionTotal.asJson
83+
)
84+
)
85+
),
86+
"science" -> Json.obj(
87+
"timeEstimate" -> Json.obj(
88+
"total" -> Json.obj(
89+
"microseconds" -> ScienceTotal.asJson
90+
)
91+
)
92+
)
93+
),
5094
"config" -> Json.obj(
5195
"gmosNorth" -> Json.obj(
5296
"science" -> Json.obj(
@@ -72,12 +116,18 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
72116
SET: {
73117
scienceRequirements: {
74118
spectroscopy: {
119+
wavelength: { micrometers: 0.700000 }
120+
resolution: 1000
75121
exposureTimeMode: {
76122
signalToNoise: {
77123
value: 99
78124
at: { nanometers: 500 }
79125
}
80126
}
127+
wavelengthCoverage: { micrometers: 0.400000 }
128+
focalPlane: null
129+
focalPlaneAngle: null
130+
capability: null
81131
}
82132
}
83133
},
@@ -107,32 +157,10 @@ class observationEditSn extends ExecutionTestSupport with ObservingModeSetupOper
107157
tid <- createTargetWithProfileAs(pi, pid)
108158
oid <- createGmosNorthLongSlitObservationAs(pi, pid, List(tid))
109159
_ <- generateOrFail(pid, oid)
110-
// expect two responses, one from editing the S/N, one because our query
111-
// requests the sequence which requires a cache update
112160
_ <- subscriptionExpect(
113161
user = pi,
114162
query = subscriptionQuery(pid),
115163
mutations = Right(sleep >> updateSn(oid)),
116164
expected = List(subscriptionResponse(oid), subscriptionResponse(oid))
117165
)
118-
yield ()
119-
120-
test("does not trigger for subsequent generation"):
121-
for
122-
pid <- createProgram(pi, "foo")
123-
tid <- createTargetWithProfileAs(pi, pid)
124-
oid <- createGmosNorthLongSlitObservationAs(pi, pid, List(tid))
125-
_ <- generateOrFail(pid, oid)
126-
_ <- subscriptionExpect(
127-
user = pi,
128-
query = subscriptionQuery(pid),
129-
mutations = Right(
130-
sleep >>
131-
updateSn(oid) >>
132-
generateOrFail(pid, oid).void >>
133-
generateOrFail(pid, oid).void >>
134-
sleep
135-
),
136-
expected = List(subscriptionResponse(oid), subscriptionResponse(oid))
137-
)
138166
yield ()

0 commit comments

Comments
 (0)