Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions cmd/dispatchoor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,33 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error
m := metrics.New()
m.SetBuildInfo(Version, GitCommit, BuildDate)

// Create GitHub client (may operate in disconnected mode if no token or invalid token).
var ghClient github.Client
// Create GitHub clients.
// - runnersClient: used for polling runner status (uses runners_token if set, else token)
// - dispatchClient: used for dispatching workflows (uses token)
var runnersClient github.Client

var dispatchClient github.Client

var poller github.Poller

if cfg.HasGitHubToken() {
ghClient = github.NewClient(log, cfg.GitHub.Token)
// Create runners client for polling (uses runners_token if configured, else falls back to token).
if cfg.HasRunnersToken() {
runnersToken := cfg.GetRunnersToken()
runnersClient = github.NewClient(log.WithField("client", "runners"), runnersToken)

if err := ghClient.Start(ctx); err != nil {
if err := runnersClient.Start(ctx); err != nil {
return err
}

defer func() {
if err := ghClient.Stop(); err != nil {
log.WithError(err).Warn("Failed to stop GitHub client")
if err := runnersClient.Stop(); err != nil {
log.WithError(err).Warn("Failed to stop runners GitHub client")
}
}()

// Only start poller if GitHub client is connected.
if ghClient.IsConnected() {
poller = github.NewPoller(log, cfg, ghClient, st, m)
// Only start poller if runners client is connected.
if runnersClient.IsConnected() {
poller = github.NewPoller(log, cfg, runnersClient, st, m)

if err := poller.Start(ctx); err != nil {
return err
Expand All @@ -112,10 +118,31 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error
}
}()
} else {
log.Warn("GitHub client not connected - runner polling disabled")
log.Warn("Runners GitHub client not connected - runner polling disabled")
}
} else {
log.Warn("No GitHub token configured for runners - runner polling disabled")
}

// Create dispatch client for workflow dispatching (uses main token).
if cfg.HasGitHubToken() {
dispatchClient = github.NewClient(log.WithField("client", "dispatch"), cfg.GitHub.Token)

if err := dispatchClient.Start(ctx); err != nil {
return err
}

defer func() {
if err := dispatchClient.Stop(); err != nil {
log.WithError(err).Warn("Failed to stop dispatch GitHub client")
}
}()

if !dispatchClient.IsConnected() {
log.Warn("Dispatch GitHub client not connected - workflow dispatch disabled")
}
} else {
log.Warn("No GitHub token configured - GitHub integration disabled")
log.Warn("No GitHub token configured for dispatch - workflow dispatch disabled")
}

// Create queue service.
Expand All @@ -127,11 +154,11 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error

defer queueSvc.Stop()

// Create and start dispatcher (only if GitHub client is connected).
// Create and start dispatcher (only if dispatch client is connected).
var disp dispatcher.Dispatcher

if ghClient != nil && ghClient.IsConnected() {
disp = dispatcher.NewDispatcher(log, cfg, st, queueSvc, ghClient)
if dispatchClient != nil && dispatchClient.IsConnected() {
disp = dispatcher.NewDispatcher(log, cfg, st, queueSvc, dispatchClient)

if err := disp.Start(ctx); err != nil {
return err
Expand All @@ -142,8 +169,6 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error
log.WithError(err).Warn("Failed to stop dispatcher")
}
}()
} else {
log.Warn("Dispatcher disabled - GitHub client not connected")
}

// Create and start auth service.
Expand All @@ -156,7 +181,7 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error
defer authSvc.Stop()

// Create and start API server.
srv := api.NewServer(log, cfg, st, queueSvc, authSvc, ghClient, m)
srv := api.NewServer(log, cfg, st, queueSvc, authSvc, runnersClient, dispatchClient, m)

// Set up runner change callbacks to broadcast via WebSocket.
if poller != nil {
Expand Down
2 changes: 2 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ database:

github:
token: ${GITHUB_TOKEN}
# Optional: separate token for listing runners (falls back to token if not set)
# runners_token: ${GITHUB_RUNNERS_TOKEN}
poll_interval: 60s
rate_limit_buffer: 100

Expand Down
134 changes: 74 additions & 60 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,36 @@ type Server interface {

// server implements Server.
type server struct {
log logrus.FieldLogger
cfg *config.Config
store store.Store
queue queue.Service
auth auth.Service
ghClient github.Client
metrics *metrics.Metrics
hub *Hub
srv *http.Server
router chi.Router
log logrus.FieldLogger
cfg *config.Config
store store.Store
queue queue.Service
auth auth.Service
runnersClient github.Client
dispatchClient github.Client
metrics *metrics.Metrics
hub *Hub
srv *http.Server
router chi.Router
}

// Ensure server implements Server.
var _ Server = (*server)(nil)

// NewServer creates a new API server.
func NewServer(log logrus.FieldLogger, cfg *config.Config, st store.Store, q queue.Service, authSvc auth.Service, ghClient github.Client, m *metrics.Metrics) Server {
func NewServer(log logrus.FieldLogger, cfg *config.Config, st store.Store, q queue.Service, authSvc auth.Service, runnersClient, dispatchClient github.Client, m *metrics.Metrics) Server {
hub := NewHub(log)

s := &server{
log: log.WithField("component", "api"),
cfg: cfg,
store: st,
queue: q,
auth: authSvc,
ghClient: ghClient,
metrics: m,
hub: hub,
log: log.WithField("component", "api"),
cfg: cfg,
store: st,
queue: q,
auth: authSvc,
runnersClient: runnersClient,
dispatchClient: dispatchClient,
metrics: m,
hub: hub,
}

// Set up callback to broadcast job state changes via WebSocket.
Expand Down Expand Up @@ -367,58 +369,64 @@ func (s *server) handleStatus(w http.ResponseWriter, r *http.Request) {
}
}

// GitHub connection and rate limit info.
if s.ghClient == nil {
resp.GitHub = GitHubStatus{
Status: ComponentStatusUnhealthy,
Connected: false,
Error: "GitHub token not configured",
}
// GitHub connection and rate limit info for both clients.
resp.GitHub = GitHubClientsStatus{}

if resp.Status == ComponentStatusHealthy {
resp.Status = ComponentStatusDegraded
}
} else if !s.ghClient.IsConnected() {
resp.GitHub = GitHubStatus{
Status: ComponentStatusUnhealthy,
Connected: false,
Error: s.ghClient.ConnectionError(),
// Helper function to get status for a single client.
getClientStatus := func(client github.Client, name string) *GitHubClientStatus {
if client == nil {
return &GitHubClientStatus{
Status: ComponentStatusUnhealthy,
Connected: false,
Error: name + " token not configured",
}
}

if resp.Status == ComponentStatusHealthy {
resp.Status = ComponentStatusDegraded
if !client.IsConnected() {
return &GitHubClientStatus{
Status: ComponentStatusUnhealthy,
Connected: false,
Error: client.ConnectionError(),
}
}
} else {
remaining := s.ghClient.RateLimitRemaining()
resetTime := s.ghClient.RateLimitReset()

githubStatus := ComponentStatusHealthy
remaining := client.RateLimitRemaining()
resetTime := client.RateLimitReset()

clientStatus := ComponentStatusHealthy
if remaining < 100 {
githubStatus = ComponentStatusDegraded
clientStatus = ComponentStatusDegraded
}

if remaining < 10 {
githubStatus = ComponentStatusUnhealthy

if resp.Status == ComponentStatusHealthy {
resp.Status = ComponentStatusDegraded
}
clientStatus = ComponentStatusUnhealthy
}

resetIn := time.Until(resetTime)
if resetIn < 0 {
resetIn = 0
}

resp.GitHub = GitHubStatus{
Status: githubStatus,
return &GitHubClientStatus{
Status: clientStatus,
Connected: true,
RateLimitRemaining: remaining,
RateLimitReset: resetTime.UTC().Format(time.RFC3339),
ResetIn: resetIn.Round(time.Second).String(),
}
}

resp.GitHub.Runners = getClientStatus(s.runnersClient, "Runners")
resp.GitHub.Dispatch = getClientStatus(s.dispatchClient, "Dispatch")

// Update overall status based on GitHub clients.
if (resp.GitHub.Runners != nil && resp.GitHub.Runners.Status == ComponentStatusUnhealthy) ||
(resp.GitHub.Dispatch != nil && resp.GitHub.Dispatch.Status == ComponentStatusUnhealthy) {
if resp.Status == ComponentStatusHealthy {
resp.Status = ComponentStatusDegraded
}
}

// Queue statistics.
pendingJobs, _ := s.store.ListJobsByStatus(ctx, store.JobStatusPending)
triggeredJobs, _ := s.store.ListJobsByStatus(ctx, store.JobStatusTriggered)
Expand Down Expand Up @@ -1127,21 +1135,21 @@ func (s *server) handleCancelJob(w http.ResponseWriter, r *http.Request) {
return
}

// Check if GitHub client is available.
if s.ghClient == nil || !s.ghClient.IsConnected() {
// Check if dispatch client is available.
if s.dispatchClient == nil || !s.dispatchClient.IsConnected() {
s.writeError(w, http.StatusServiceUnavailable, "GitHub integration is not available")

return
}

// Cancel the workflow run on GitHub.
if err := s.ghClient.CancelWorkflowRun(r.Context(), owner, repo, *job.RunID); err != nil {
if err := s.dispatchClient.CancelWorkflowRun(r.Context(), owner, repo, *job.RunID); err != nil {
s.log.WithError(err).Warn("Cancel request returned error, checking actual run status")

// Check if the run was actually cancelled despite the error.
// GitHub can return transient errors like "job scheduled on GitHub side"
// even when the cancellation succeeds.
run, getErr := s.ghClient.GetWorkflowRun(r.Context(), owner, repo, *job.RunID)
run, getErr := s.dispatchClient.GetWorkflowRun(r.Context(), owner, repo, *job.RunID)
if getErr != nil {
s.log.WithError(getErr).Error("Failed to verify workflow run status after cancel error")
s.writeError(w, http.StatusInternalServerError, "Failed to cancel workflow run on GitHub")
Expand Down Expand Up @@ -1320,8 +1328,8 @@ type DatabaseStatus struct {
Error string `json:"error,omitempty"`
}

// GitHubStatus contains GitHub API rate limit information.
type GitHubStatus struct {
// GitHubClientStatus contains status and rate limit information for a single GitHub client.
type GitHubClientStatus struct {
Status ComponentStatus `json:"status"`
Connected bool `json:"connected"`
Error string `json:"error,omitempty"`
Expand All @@ -1330,6 +1338,12 @@ type GitHubStatus struct {
ResetIn string `json:"reset_in,omitempty"`
}

// GitHubClientsStatus contains status for both GitHub clients.
type GitHubClientsStatus struct {
Runners *GitHubClientStatus `json:"runners,omitempty"`
Dispatch *GitHubClientStatus `json:"dispatch,omitempty"`
}

// QueueStats contains queue statistics.
type QueueStats struct {
PendingJobs int `json:"pending_jobs"`
Expand All @@ -1346,12 +1360,12 @@ type VersionInfo struct {

// SystemStatusResponse is the comprehensive status response.
type SystemStatusResponse struct {
Status ComponentStatus `json:"status"`
Timestamp string `json:"timestamp"`
Database DatabaseStatus `json:"database"`
GitHub GitHubStatus `json:"github"`
Queue QueueStats `json:"queue"`
Version VersionInfo `json:"version"`
Status ComponentStatus `json:"status"`
Timestamp string `json:"timestamp"`
Database DatabaseStatus `json:"database"`
GitHub GitHubClientsStatus `json:"github"`
Queue QueueStats `json:"queue"`
Version VersionInfo `json:"version"`
}

// HistoryResponse wraps the paginated history response.
Expand Down
16 changes: 16 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type PostgresConfig struct {
// GitHubConfig contains GitHub API settings.
type GitHubConfig struct {
Token string `yaml:"token"`
RunnersToken string `yaml:"runners_token"`
PollInterval time.Duration `yaml:"poll_interval"`
RateLimitBuffer int `yaml:"rate_limit_buffer"`
}
Expand Down Expand Up @@ -475,6 +476,21 @@ func (c *Config) HasGitHubToken() bool {
return c.GitHub.Token != ""
}

// GetRunnersToken returns the token to use for listing runners.
// Returns RunnersToken if configured, otherwise falls back to Token.
func (c *Config) GetRunnersToken() string {
if c.GitHub.RunnersToken != "" {
return c.GitHub.RunnersToken
}

return c.GitHub.Token
}

// HasRunnersToken returns true if a token for listing runners is available.
func (c *Config) HasRunnersToken() bool {
return c.GetRunnersToken() != ""
}

// String returns a sanitized string representation of the config (no secrets).
func (c *Config) String() string {
var sb strings.Builder
Expand Down
Loading
Loading