diff --git a/docs/src/usage/index.md b/docs/src/usage/index.md index 1031492..f343803 100644 --- a/docs/src/usage/index.md +++ b/docs/src/usage/index.md @@ -17,12 +17,8 @@ With that in mind, I would highly recommend you become familiar with the [Plugin ## Authentication The first time you try to do anything which requires authentication, you will be automatically prompted for your credentials. -This prompt will expect the `Synack Email` and `Synack Password`, which are fairly self explanitory, but it also asks for the `Synack OTP Secret`. +This prompt will expect the `Synack Email` and `Synack Password`, which are fairly self explanatory. -The `Synack OTP Secret` is NOT the 8 digit code you pull out of Authy. -Instead, it is a string that you must extract from Authy via a method similar to the one found [here](https://gist.github.com/gboudreau/94bb0c11a6209c82418d01a59d958c93). - -Use the above instructions at your own discression. -I TAKE NO RESPONSIBILITY IF SOMETHING BAD HAPPENS AS A RESULT. +For Duo MFA setup options, see the [Duo plugin documentation](./plugins/duo.md). Once you complete these steps, your credentials are stored in a SQLiteDB at `~/.config/synack/synackapi.db`. diff --git a/docs/src/usage/main-components/state.md b/docs/src/usage/main-components/state.md index 21df449..4f8fc90 100644 --- a/docs/src/usage/main-components/state.md +++ b/docs/src/usage/main-components/state.md @@ -55,12 +55,16 @@ In the event that one of the State variables is set and is **not** constantly at | api_token | str | This is the Synack Access Token used to authenticate requests | config_dir | pathlib.Path | The location of the Database and Login script | debug | bool | Used to show/hide debugging messages +| duo_push_akey | str | Duo device activation key for push auto-approval +| duo_push_host | str | Duo API hostname for push auto-approval +| duo_push_pkey | str | Duo device private key for push auto-approval +| duo_push_rsa_key_path | str | Path to RSA private key for signing Duo API requests | email | str | Your email address used to log into Synack | http_proxy | str | A Web Proxy (Burp, etc.) to intercept requests | https_proxy | str | A Web Proxy (Burp, etc.) to intercept requests | login | bool | Used to enable/disable a check of the api_token upon creation of the Handler | notifications_token | str | Token used for authentication when dealing with Synack Notifications -| otp_secret | str | OTP Secret held by Authy. NOT an OTP. For more information, read the Usage page +| otp_secret | str | OTP Secret held by Duo Mobile. NOT an OTP. For more information, read the Usage page | password | str | Your Synack Password | session | requests.Session | Tracks cookies and headers across various functions | template_dir | pathlib.Path | The location of your Mission Templates diff --git a/docs/src/usage/plugins/duo.md b/docs/src/usage/plugins/duo.md index 0347edd..990f4ef 100644 --- a/docs/src/usage/plugins/duo.md +++ b/docs/src/usage/plugins/duo.md @@ -1,5 +1,58 @@ # Duo +## Duo MFA Options + +When prompted during authentication, you can choose from three options: + +**Option 1: Manual Push Approval (Simplest)** +- Press Enter when prompted for OTP Secret +- Approve push notifications on your phone each time the token is expired +- No additional setup required + +**Option 2: Automated OTP (Preferred)** +- Enter your OTP Secret when prompted (accepts both hex and base32 formats) +- Automatically generates OTP codes using a counter (saved in the database) +- Extract the `hotp_secret` from Duo Mobile using [synackDUO](https://github.com/dinosn/synackDUO) (see `response.json`) +- **Note:** This is NOT the 8-digit codes from Duo Mobile, but the HOTP secret key + +**Option 3: Automated Duo Push** +- Uses Duo credentials to auto-approve push requests +- Can also approve push requests using duo.approve_pending_push(timeout) +- Extract credentials using [synackDUO](https://github.com/dinosn/synackDUO) (see `response.json`) (see below) + + +**Disclaimer:** Use the above instructions at your own discretion. I TAKE NO RESPONSIBILITY IF SOMETHING BAD HAPPENS AS A RESULT. + +## Duo Push Auto-Approval Setup + +The Duo plugin supports push notification approval using device credentials. + +### Prerequisites + +You must extract and configure four credentials from Duo Mobile: + +| Credential | Description | Example +| --- | --- | --- +| `duo_push_akey` | Device activation key | `DAXXXXXXXXXXXXXXXXXXXX` +| `duo_push_pkey` | Device private key | `DPXXXXXXXXXXXXXXXXXXXX` +| `duo_push_host` | Duo API hostname | `api-xxxxxxxx.duosecurity.com` +| `duo_push_rsa_key_path` | Path to RSA private key | `~/.config/synack/duo/key.pem` + +### Configuration + +Set credentials in the database: +PP +```python +import synack + +h = synack.Handler(login=False) + +h.db.set_config('duo_push_akey', 'DAXXXXXXXXXXXXXXXXXX') +h.db.set_config('duo_push_pkey', 'DPXXXXXXXXXXXXXXXXXX') +h.db.set_config('duo_push_host', 'api-xxxxxxxx.duosecurity.com') +h.db.set_config('duo_push_rsa_key_path', 'synackDUO/key.pem') +``` + ## duo.get_grant_token(auth_url) > Handles Duo Security MFA stages and returns the grant_token used to finish logging into Synack @@ -13,3 +66,21 @@ >> >>> h.duo.get_grant_token('https:///...duosecurity.com/...') >> 'Y8....6g' >> ``` + +## duo.approve_pending_push(timeout) + +> Wait for and approve a single Duo push notification +> +> Polls Duo's device API for pending push notifications and automatically approves the first one found. Useful for automated workflows that need to handle Duo MFA. +> +> | Argument | Type | Default | Description +> | --- | --- | --- | --- +> | `timeout` | int | 30 | Maximum seconds to wait for a push notification +> +> Returns `True` if a push was approved, `False` if timeout or error occurred. +> +>> Examples +>> ```python3 +>> >>> h.duo.approve_pending_push(timeout=60) +>> True +>> ``` diff --git a/src/synack/_state.py b/src/synack/_state.py index 47348c2..ff09c87 100644 --- a/src/synack/_state.py +++ b/src/synack/_state.py @@ -41,6 +41,11 @@ def __init__(self): self._use_proxies = None self._use_scratchspace = None self._user_id = None + self._duo_push_akey = None + self._duo_push_pkey = None + self._duo_push_host = None + self._duo_push_rsa_key_path = None + self._duo_device = None @property def smtp_email_from(self) -> str: @@ -357,3 +362,58 @@ def user_id(self) -> str: @user_id.setter def user_id(self, value: str) -> None: self._user_id = value + + @property + def duo_push_akey(self) -> str: + ret = self._duo_push_akey + if ret is None: + ret = self._db.duo_push_akey + return ret + + @duo_push_akey.setter + def duo_push_akey(self, value: str) -> None: + self._duo_push_akey = value + + @property + def duo_push_pkey(self) -> str: + ret = self._duo_push_pkey + if ret is None: + ret = self._db.duo_push_pkey + return ret + + @duo_push_pkey.setter + def duo_push_pkey(self, value: str) -> None: + self._duo_push_pkey = value + + @property + def duo_push_host(self) -> str: + ret = self._duo_push_host + if ret is None: + ret = self._db.duo_push_host + return ret + + @duo_push_host.setter + def duo_push_host(self, value: str) -> None: + self._duo_push_host = value + + @property + def duo_push_rsa_key_path(self) -> str: + ret = self._duo_push_rsa_key_path + if ret is None: + ret = self._db.duo_push_rsa_key_path + return ret + + @duo_push_rsa_key_path.setter + def duo_push_rsa_key_path(self, value: str) -> None: + self._duo_push_rsa_key_path = value + + @property + def duo_device(self) -> str: + ret = self._duo_device + if ret is None: + ret = self._db.duo_device + return ret + + @duo_device.setter + def duo_device(self, value: str) -> None: + self._duo_device = value diff --git a/src/synack/db/alembic/versions/6f542023f57e_add_duo_push_method.py b/src/synack/db/alembic/versions/6f542023f57e_add_duo_push_method.py new file mode 100644 index 0000000..0fdde9a --- /dev/null +++ b/src/synack/db/alembic/versions/6f542023f57e_add_duo_push_method.py @@ -0,0 +1,38 @@ +"""add duo push method + +Revision ID: 6f542023f57e +Revises: 1434aa7ed47c +Create Date: 2025-11-06 08:23:42.181054 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '6f542023f57e' +down_revision = '1434aa7ed47c' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('config') as batch_op: + batch_op.add_column(sa.Column('duo_push_akey', sa.VARCHAR(length=200), nullable=True)) + batch_op.add_column(sa.Column('duo_push_pkey', sa.VARCHAR(length=200), nullable=True)) + batch_op.add_column(sa.Column('duo_push_host', sa.VARCHAR(length=100), nullable=True)) + batch_op.add_column(sa.Column('duo_push_rsa_key_path', sa.VARCHAR(length=250), nullable=True)) + batch_op.add_column(sa.Column('duo_device', sa.VARCHAR(length=50), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('config') as batch_op: + batch_op.drop_column('duo_device') + batch_op.drop_column('duo_push_rsa_key_path') + batch_op.drop_column('duo_push_host') + batch_op.drop_column('duo_push_pkey') + batch_op.drop_column('duo_push_akey') + # ### end Alembic commands ### diff --git a/src/synack/db/models/config.py b/src/synack/db/models/config.py index 943f1da..216b467 100644 --- a/src/synack/db/models/config.py +++ b/src/synack/db/models/config.py @@ -38,3 +38,10 @@ class Config(Base): user_id = sa.Column(sa.VARCHAR(20), default='') use_proxies = sa.Column(sa.BOOLEAN, default=False) use_scratchspace = sa.Column(sa.BOOLEAN, default=False) + duo_push_akey = sa.Column(sa.VARCHAR(200), default='') + duo_push_pkey = sa.Column(sa.VARCHAR(200), default='') + duo_push_host = sa.Column(sa.VARCHAR(100), default='') + duo_push_rsa_key_path = sa.Column( + sa.VARCHAR(250), default='' + ) + duo_device = sa.Column(sa.VARCHAR(50), default='') diff --git a/src/synack/plugins/api.py b/src/synack/plugins/api.py index 1cc70c1..19f4ef3 100644 --- a/src/synack/plugins/api.py +++ b/src/synack/plugins/api.py @@ -144,13 +144,17 @@ def request(self, method, path, attempts=0, **kwargs): f"\n\tData: {data}" + f"\n\tContent: {res.content}") - reason_failed = 'Request failed' - if res.status_code in [ 400, 401 ]: - reason_failed = 'Request failed' + reason_failed = None + if res.status_code == 400: + reason_failed = 'Bad request' + elif res.status_code == 401: + reason_failed = 'Unauthorized' elif res.status_code == 403: reason_failed = 'Logged out' elif res.status_code == 412: - fail_reason = 'Mission already claimed' + reason_failed = 'Mission already claimed' + elif res.status_code == 423: + reason_failed = 'Locked' elif res.status_code == 429: self._debug.log('Too many requests', f'({res.status_code} - {res.reason}) {res.url}') if attempts < 5: @@ -165,6 +169,8 @@ def request(self, method, path, attempts=0, **kwargs): attempts += 1 return self.request(method, path, attempts, **kwargs) - self._debug.log('Mission already claimed', f'({res.status_code} - {res.reason}) {res.url}') + # Log terminal failures (non-retryable errors) + if res.status_code in [400, 401, 403, 412, 423]: + self._debug.log(reason_failed, f'({res.status_code} - {res.reason}) {res.url}') return res diff --git a/src/synack/plugins/auth.py b/src/synack/plugins/auth.py index 5f569c2..1195ce8 100644 --- a/src/synack/plugins/auth.py +++ b/src/synack/plugins/auth.py @@ -62,9 +62,11 @@ def get_authentication_response(self, csrf): if res.status_code == 200: return res.json() elif res.status_code == 400: - csrf = self.get_login_csrf() - if csrf: - return self.get_authentication_response(csrf) + self._db.email = '' + self._db.password = '' + raise ValueError("Invalid email or password. Please run the script again to re-enter credentials.") + elif res.status_code == 423: + raise ValueError("Your account has been locked due to too many failed login attempts. Please wait and try again later.") def get_login_csrf(self): """Get the CSRF Token from the login page""" diff --git a/src/synack/plugins/db.py b/src/synack/plugins/db.py index 852c2a9..21a828d 100644 --- a/src/synack/plugins/db.py +++ b/src/synack/plugins/db.py @@ -428,8 +428,10 @@ def get_config(self, name=None): if not config: config = Config() session.add(config) + session.commit() + ret = getattr(config, name) if name else config session.close() - return getattr(config, name) if name else config + return ret @property def http_proxy(self): @@ -478,14 +480,49 @@ def otp_count(self, value): def otp_secret(self): ret = self.get_config('otp_secret') if not ret: - ret = input('Synack OTP Secret: ') - self.otp_secret = ret + # Skip prompt if automated push credentials are already configured + if self.duo_push_akey and self.duo_push_pkey and self.duo_push_host: + ret = '' + self.otp_secret = ret + # Skip prompt if user has already selected a device for manual push + elif self.duo_device: + ret = '' + else: + print("\nDuo MFA Authentication Setup:") + print( + "1. Press Enter to use Duo Push notifications " + "(you'll approve on your phone)" + ) + print("2. OR enter your Duo OTP Secret for automated passcode generation") + print(" (Accepts hex (hotp_secret) or base32 (otpauth://) format)") + ret = input('\nDuo OTP Secret (or press Enter for push): ').strip() + self.otp_secret = ret if ret else '' return ret @otp_secret.setter def otp_secret(self, value): + # Auto-detect and convert hex format to base32 + # Duo's hotp_secret is a hex string, but needs to be treated as UTF-8 + # not as hex bytes (based on duo-hotp reference implementation) + if value and self._is_hex_secret(value): + import base64 + # Encode the hex string as UTF-8 bytes, then base32 + value = base64.b32encode(value.encode('utf-8')).decode('ascii').rstrip('=') self.set_config('otp_secret', value) + def _is_hex_secret(self, value): + """Check if the secret appears to be in hex format (not base32)""" + # Hex: 32 chars using only 0-9, a-f + # Base32: variable length using A-Z, 2-7 + if len(value) != 32: + return False + try: + # If it can be decoded as hex, it's hex + bytes.fromhex(value) + return True + except ValueError: + return False + @property def password(self): ret = self.get_config('password') @@ -498,6 +535,46 @@ def password(self): def password(self, value): self.set_config('password', value) + @property + def duo_push_akey(self): + return self.get_config('duo_push_akey') + + @duo_push_akey.setter + def duo_push_akey(self, value): + self.set_config('duo_push_akey', value) + + @property + def duo_push_pkey(self): + return self.get_config('duo_push_pkey') + + @duo_push_pkey.setter + def duo_push_pkey(self, value): + self.set_config('duo_push_pkey', value) + + @property + def duo_push_host(self): + return self.get_config('duo_push_host') + + @duo_push_host.setter + def duo_push_host(self, value): + self.set_config('duo_push_host', value) + + @property + def duo_push_rsa_key_path(self): + return self.get_config('duo_push_rsa_key_path') + + @duo_push_rsa_key_path.setter + def duo_push_rsa_key_path(self, value): + self.set_config('duo_push_rsa_key_path', value) + + @property + def duo_device(self): + return self.get_config('duo_device') + + @duo_device.setter + def duo_device(self, value): + self.set_config('duo_device', value) + @property def ports(self): session = self.Session() diff --git a/src/synack/plugins/duo.py b/src/synack/plugins/duo.py index be3ff23..2ddcb4a 100644 --- a/src/synack/plugins/duo.py +++ b/src/synack/plugins/duo.py @@ -9,7 +9,14 @@ import json import pyotp import re +import requests import time +from datetime import UTC, datetime +from pathlib import Path +from urllib.parse import urlencode +from Crypto.Hash import SHA512 +from Crypto.PublicKey import RSA +from Crypto.Signature import pkcs1_15 class Duo(Plugin): @@ -33,6 +40,7 @@ def __init__(self, *args, **kwargs): self._sid = None self._txid = None self._xsrf = None + self._pubkey = None def _build_headers(self, overrides=None): headers = { @@ -56,7 +64,35 @@ def get_grant_token(self, auth_url): self._set_session_variables() # Yes, this needs to be called twice... self._get_txid() if self._txid: - self._get_status() + # Priority 1: OTP (if configured) + if self._state.otp_secret: + # OTP passcode already sent in _get_txid(), just poll for status + self._get_status() + # Priority 2: Auto-approval (if configured) - HARD FAIL if broken + elif self.is_configured(): + if not self.load_rsa_key(): + raise RuntimeError( + "Duo Push auto-approval is enabled but RSA key failed to load" + ) + print("Auto-approving Duo push notification...") + if self._state.debug: + print(f"Using device: {self._device}") + print(f"Configured duo_device: {self._state.duo_device}") + if self._device != self._state.duo_device: + print(f"WARNING: Push sent to {self._device} but credentials are for {self._state.duo_device}") + # Wait 2 seconds before polling to give Duo time to register the push + time.sleep(2) + if not self.approve_pending_push(timeout=25): + raise RuntimeError( + "Duo Push auto-approval failed - check credentials or " + "disable auto-approval. Ensure duo_device matches the device " + "with extracted credentials." + ) + self._get_status() + # Priority 3: Manual push (fallback) + else: + print("Waiting for manual Duo push approval on your device...") + self._get_status() if self._status == 'SUCCESS': self._get_oidc_exit() if self._progress_token: @@ -103,16 +139,69 @@ def _get_mfa_details(self): 'sid': self._sid } res = self._api.request('GET', f'{self._base_url}/frame/v4/auth/prompt/data', headers=headers, query=query) + if res.status_code == 200: - for method in res.json().get('response', {}).get('auth_method_order', []): - if method.get('factor', '') == 'Duo Push': - device_key = method.get('deviceKey', '') - break + response_json = res.json() + response_data = response_json.get('response', {}) + phones = response_data.get('phones', []) + + # If auto-approval credentials are configured, find the matching device + if self.is_configured(): + # Match device by pkey + pkey = self._state.duo_push_pkey + for phone in phones: + if phone.get('key', '') == pkey: + self._device = phone.get('index', '') + self._factor = 'Duo Push' + # Update stored device if it doesn't match + if self._state.duo_device != self._device: + print(f"Auto-correcting duo_device from {self._state.duo_device} to {self._device}") + self._db.duo_device = self._device + return + # If no match found, credentials are for wrong account + print(f"WARNING: duo_push_pkey {pkey} not found in available devices") + print("Falling back to manual device selection") + + # Check if we have a stored device preference + if self._state.duo_device: + # Use the stored device + for phone in phones: + if phone.get('index', '') == self._state.duo_device: + self._device = phone.get('index', '') + self._factor = 'Duo Push' + return + # If stored device not found, fall through to prompt - for phone in res.json().get('response', {}).get('phones', []): - if phone.get('key', '') == device_key: - self._device = phone.get('index', '') - self._factor = 'Duo Push' + # Prompt user to select a device + if phones: + print("\nAvailable Duo devices:") + for i, phone in enumerate(phones, 1): + print(f"{i}. {phone.get('name', 'Unknown')} ({phone.get('index', '')})") + + while True: + try: + choice = input("\nSelect device number (or press Enter for first device): ").strip() + if not choice: + selected_phone = phones[0] + break + choice_num = int(choice) + if 1 <= choice_num <= len(phones): + selected_phone = phones[choice_num - 1] + break + print(f"Please enter a number between 1 and {len(phones)}") + except ValueError: + print("Please enter a valid number") + + self._device = selected_phone.get('index', '') + self._factor = 'Duo Push' + self._db.duo_device = self._device + return + + if not self._device or not self._factor: + raise ValueError( + f'Failed to determine MFA device/factor from Duo API. ' + f'HTTP {res.status_code}, device={self._device}, factor={self._factor}' + ) def _get_oidc_exit(self): headers = { @@ -204,7 +293,8 @@ def _get_status(self): 'txid': self._txid, 'sid': self._sid } - for i in range(5): + # Increase polling attempts from 5 to 12 (1 minute total with 5s intervals) + for i in range(12): res = self._api.request('POST', f'{self._base_url}/frame/v4/status', headers=headers, data=data) if res.status_code == 200: status_enum = res.json().get('response', {}).get('status_enum', -1) @@ -223,8 +313,9 @@ def _get_status(self): break elif status_enum == 13: # Awaiting Push Notification pass - elif status_enum == 15: # Push Notification MFA Blocked - break + elif status_enum == 15: # Push sent, waiting for approval + # Continue polling for both auto-approval and manual approval + pass elif status_enum == 44: # Prior Code self._db.otp_count += 5 break @@ -297,3 +388,155 @@ def _set_session_variables(self): res = self._api.request('POST', self._referrer, headers=headers, data=self._session_vars) if res.status_code == 200: self._referrer = res.url + + # Duo Push Auto-Approval Methods + + def is_configured(self): + """Check if Duo push auto-approval credentials are configured""" + return ( + self._state.duo_push_akey and + self._state.duo_push_pkey and + self._state.duo_push_host + ) + + def load_rsa_key(self): + """Load RSA key from configured path""" + if not self.is_configured(): + return False + + key_path = Path(self._state.duo_push_rsa_key_path).expanduser() + if not key_path.exists(): + print(f"Duo RSA key not found: {key_path}") + return False + + try: + with open(key_path, 'rb') as f: + self._pubkey = RSA.import_key(f.read()) + return True + except Exception as e: + print(f"Failed to load Duo RSA key: {e}") + return False + + def approve_pending_push(self, timeout=30): + """Wait for and approve a single Duo push notification""" + if not self.is_configured(): + return False + + if not self._pubkey and not self.load_rsa_key(): + print("Cannot approve push: RSA key not available") + return False + + print("Polling for Duo push notification...") + start_time = time.monotonic() + poll_interval = 2 # Poll every 2 seconds + + while time.monotonic() - start_time < timeout: + try: + # Poll for transactions + transactions = self._get_transactions() + if self._state.debug: + print(f"Transactions response: {transactions}") + response_data = transactions.get('response', {}) + pending = response_data.get('transactions', []) + current_time = response_data.get('current_time', 0) + + if self._state.debug: + print(f"Found {len(pending)} pending transactions") + + if pending: + for tx in pending: + tx_id = tx.get('urgid') + expiration = tx.get('expiration', 0) + + if self._state.debug: + print(f"Transaction: {tx}") + + # Skip expired transactions + if expiration and current_time and expiration <= current_time: + if self._state.debug: + print(f"Skipping expired transaction {tx_id}") + continue + + if tx_id: + tx_summary = tx.get('summary', 'N/A') + print(f"Approving Duo push {tx_id[:12]}... ({tx_summary})") + response = self._reply_transaction(tx_id, 'approve') + if response.get('stat') == 'OK': + print("Duo push approved successfully") + return True + else: + print(f"Push approval returned: {response}") + + time.sleep(poll_interval) + + except Exception as e: + print(f"Error during Duo push approval: {e}") + return False + + return False + + def _generate_signature(self, method, path, time_str, data): + """Generate RSA signature for Duo API request""" + encoded_data = urlencode(sorted(data.items())) if data else "" + message_parts = [ + time_str, + method.upper(), + self._state.duo_push_host.lower(), + path, + encoded_data, + ] + message = "\n".join(message_parts).encode('ascii') + h = SHA512.new(message) + signature = pkcs1_15.new(self._pubkey).sign(h) + auth_string = f"{self._state.duo_push_pkey}:{base64.b64encode(signature).decode('ascii')}" + return "Basic " + base64.b64encode(auth_string.encode('ascii')).decode('ascii') + + def _make_request(self, method, path, data): + """Make authenticated request to Duo device API""" + dt = datetime.now(UTC) + # Format as RFC 2822 date for HTTP header (e.g., "Mon, 04 Nov 2025 12:34:56 GMT") + time_str = dt.strftime('%a, %d %b %Y %H:%M:%S GMT') + signature = self._generate_signature(method, path, time_str, data) + + url = f"https://{self._state.duo_push_host}{path}" + headers = { + 'Authorization': signature, + 'x-duo-date': time_str, + 'Host': self._state.duo_push_host, + 'Content-Type': 'application/x-www-form-urlencoded', + } + + try: + if method.upper() == 'GET': + r = requests.get(url, params=data, headers=headers, timeout=10) + else: + r = requests.post(url, data=data, headers=headers, timeout=10) + + r.raise_for_status() + return r.json() + except Exception as e: + print(f"Duo API request failed: {e}") + raise + + def _get_transactions(self): + """Get pending Duo push transactions""" + path = "/push/v2/device/transactions" + params = { + 'akey': self._state.duo_push_akey, + 'fips_status': '1', + 'hsm_status': 'true', + 'pkpush': 'rsa-sha512', + } + return self._make_request('GET', path, params) + + def _reply_transaction(self, transaction_id, answer): + """Reply to a Duo push transaction (approve/deny)""" + path = f"/push/v2/device/transactions/{transaction_id}" + data = { + 'akey': self._state.duo_push_akey, + 'answer': answer, + 'fips_status': '1', + 'hsm_status': 'true', + 'pkpush': 'rsa-sha512', + } + return self._make_request('POST', path, data)