Skip to content

Commit aa63613

Browse files
authored
Added SignalWorkflowSkippingDecision (#753)
Added support for buffering a signal without delivering it to a workflow. Needed to test async channel draining before completing a workflow.
1 parent 46e0d01 commit aa63613

File tree

3 files changed

+43
-4
lines changed

3 files changed

+43
-4
lines changed

internal/internal_workflow_testsuite.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,14 +1804,14 @@ func (env *testWorkflowEnvironmentImpl) cancelWorkflow(callback resultHandler) {
18041804
}, true)
18051805
}
18061806

1807-
func (env *testWorkflowEnvironmentImpl) signalWorkflow(name string, input interface{}) {
1807+
func (env *testWorkflowEnvironmentImpl) signalWorkflow(name string, input interface{}, startDecisionTask bool) {
18081808
data, err := encodeArg(env.GetDataConverter(), input)
18091809
if err != nil {
18101810
panic(err)
18111811
}
18121812
env.postCallback(func() {
18131813
env.signalHandler(name, data)
1814-
}, true)
1814+
}, startDecisionTask)
18151815
}
18161816

18171817
func (env *testWorkflowEnvironmentImpl) signalWorkflowByID(workflowID, signalName string, input interface{}) error {
@@ -1888,7 +1888,7 @@ func newTestSessionEnvironment(testWorkflowEnvironment *testWorkflowEnvironmentI
18881888
}
18891889

18901890
func (t *testSessionEnvironmentImpl) SignalCreationResponse(ctx context.Context, sessionID string) error {
1891-
t.testWorkflowEnvironment.signalWorkflow(sessionID, t.sessionEnvironmentImpl.getCreationResponse())
1891+
t.testWorkflowEnvironment.signalWorkflow(sessionID, t.sessionEnvironmentImpl.getCreationResponse(), true)
18921892
return nil
18931893
}
18941894

internal/internal_workflow_testsuite_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1968,6 +1968,38 @@ func (s *WorkflowTestSuiteUnitTest) Test_ContextMisuse() {
19681968
s.Contains(env.GetWorkflowError().Error(), "block on coroutine which is already blocked")
19691969
}
19701970

1971+
func (s *WorkflowTestSuiteUnitTest) Test_DrainSignalChannel() {
1972+
workflowFn := func(ctx Context) (string, error) {
1973+
1974+
signalCh := GetSignalChannel(ctx, "test-signal")
1975+
var s1, s2, s3 string
1976+
signalCh.Receive(ctx, &s1)
1977+
if !signalCh.ReceiveAsync(&s2) {
1978+
return "", errors.New("expected signal")
1979+
}
1980+
if signalCh.ReceiveAsync(&s3) {
1981+
return "", errors.New("unexpected signal")
1982+
}
1983+
return s1 + s2, nil
1984+
}
1985+
1986+
RegisterWorkflow(workflowFn)
1987+
env := s.NewTestWorkflowEnvironment()
1988+
1989+
env.RegisterDelayedCallback(func() {
1990+
env.SignalWorkflowSkippingDecision("test-signal", "s1")
1991+
env.SignalWorkflow("test-signal", "s2")
1992+
}, time.Minute)
1993+
1994+
env.ExecuteWorkflow(workflowFn)
1995+
1996+
s.True(env.IsWorkflowCompleted())
1997+
s.NoError(env.GetWorkflowError())
1998+
var result string
1999+
env.GetWorkflowResult(&result)
2000+
s.Equal("s1s2", result)
2001+
}
2002+
19712003
func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetry() {
19722004
attempt1Count := 0
19732005
activityFailedFn := func(ctx context.Context) (string, error) {

internal/workflow_testsuite.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,14 @@ func (t *TestWorkflowEnvironment) CancelWorkflow() {
563563

564564
// SignalWorkflow sends signal to the currently running test workflow.
565565
func (t *TestWorkflowEnvironment) SignalWorkflow(name string, input interface{}) {
566-
t.impl.signalWorkflow(name, input)
566+
t.impl.signalWorkflow(name, input, true)
567+
}
568+
569+
// SignalWorkflowSkippingDecision sends signal to the currently running test workflow without invoking workflow code.
570+
// Used to test processing of multiple buffered signals before completing workflow.
571+
// It must be followed by SignalWorkflow, CancelWorkflow or CompleteActivity to force a decision.
572+
func (t *TestWorkflowEnvironment) SignalWorkflowSkippingDecision(name string, input interface{}) {
573+
t.impl.signalWorkflow(name, input, false)
567574
}
568575

569576
// SignalWorkflowByID sends signal to the currently running test workflow.

0 commit comments

Comments
 (0)