Skip to content

test

test #5

name: Azure Queue Events
on:
push:
branches:
- '**' # This will run on all branches
env:
AZURE_QUEUE_STORAGE_ACCOUNT: ${{ secrets.AZURE_QUEUE_STORAGE_ACCOUNT }}
AZURE_STORAGE_QUEUE_NAME: events-queue-${{ github.run_id }}
jobs:
process-events:
runs-on: ubuntu-latest
environment: nemo-ci
permissions:
id-token: write
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Azure Login
uses: azure/login@v2
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install Azure Storage SDK
run: |
python -m pip install --upgrade pip
pip install azure-storage-queue azure-identity
- name: Process Events
run: |
python - <<EOF
import os
import json
from azure.storage.queue import QueueServiceClient
from azure.identity import DefaultAzureCredential
# Get events list from environment variable
events_list = [{"test": n} for n in range(20)]
# Create queue service client using managed identity
account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
# Use DefaultAzureCredential which will automatically use the managed identity
credential = DefaultAzureCredential()
queue_service = QueueServiceClient(account_url=account_url, credential=credential)
# Create queue if it doesn't exist
queue_client = queue_service.get_queue_client(queue_name)
queue_client.create_queue()
# Send each event to the queue
for event in events_list:
queue_client.send_message(json.dumps(event))
print(f"Successfully sent {len(events_list)} events to queue {queue_name}")
EOF
read-events:
needs: process-events
runs-on: ubuntu-latest
environment: nemo-ci
permissions:
id-token: write
contents: read
strategy:
matrix:
worker: [1, 2, 3]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Azure Login
uses: azure/login@v2
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install Azure Storage SDK
run: |
python -m pip install --upgrade pip
pip install azure-storage-queue azure-identity
- name: Read Events
run: |
python - <<EOF
import os
import json
import time
from azure.storage.queue import QueueServiceClient
from azure.identity import DefaultAzureCredential
# Create queue service client using managed identity
account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
worker_id = os.environ.get('WORKER_ID')
# Use DefaultAzureCredential which will automatically use the managed identity
credential = DefaultAzureCredential()
queue_service = QueueServiceClient(account_url=account_url, credential=credential)
queue_client = queue_service.get_queue_client(queue_name)
# Wait for messages to be available (with timeout)
max_attempts = 30
attempt = 0
while attempt < max_attempts:
try:
# Try to peek at messages
messages = queue_client.peek_messages(max_messages=1)
if messages:
break
print(f"Worker {worker_id}: No messages yet, attempt {attempt + 1}/{max_attempts}")
time.sleep(2) # Wait 2 seconds between attempts
attempt += 1
except Exception as e:
print(f"Worker {worker_id}: Error peeking messages: {e}")
time.sleep(2)
attempt += 1
if attempt >= max_attempts:
print(f"Worker {worker_id}: No messages found in queue after waiting - exiting successfully")
exit(0)
# Receive and process messages
while True:
try:
messages = queue_client.receive_messages(messages_per_page=1)
if not messages:
print(f"Worker {worker_id}: No more messages in queue - exiting successfully")
exit(0)
for message in messages:
try:
content = json.loads(message.content)
print(f"Worker {worker_id}: Received message: {json.dumps(content, indent=2)}")
# Delete the message after processing
queue_client.delete_message(message.id, message.pop_receipt)
# Check if there are more messages
peek_messages = queue_client.peek_messages(max_messages=1)
if not peek_messages:
print(f"Worker {worker_id}: No more messages in queue - exiting successfully")
exit(0)
except json.JSONDecodeError as e:
print(f"Worker {worker_id}: Error decoding message: {e}")
print(f"Worker {worker_id}: Raw message content: {message.content}")
except Exception as e:
print(f"Worker {worker_id}: Error processing messages: {e}")
break
print(f"Worker {worker_id}: Finished processing all messages")
EOF
env:
WORKER_ID: ${{ matrix.worker }}
cleanup-queue:
needs: read-events
runs-on: ubuntu-latest
environment: nemo-ci
permissions:
id-token: write
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Azure Login
uses: azure/login@v2
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install Azure Storage SDK
run: |
python -m pip install --upgrade pip
pip install azure-storage-queue azure-identity
- name: Delete Queue
run: |
python - <<EOF
import os
from azure.storage.queue import QueueServiceClient
from azure.identity import DefaultAzureCredential
# Create queue service client using managed identity
account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
# Use DefaultAzureCredential which will automatically use the managed identity
credential = DefaultAzureCredential()
queue_service = QueueServiceClient(account_url=account_url, credential=credential)
queue_client = queue_service.get_queue_client(queue_name)
try:
# Delete the queue
queue_client.delete_queue()
print(f"Successfully deleted queue: {queue_name}")
except Exception as e:
print(f"Error deleting queue: {e}")
# Don't fail the job if queue deletion fails
exit(0)
EOF