From 74a7e044c55b6cbd5533ba28121f8fdfe0641291 Mon Sep 17 00:00:00 2001 From: Derb237 <198724448+Derb237@users.noreply.github.com> Date: Tue, 7 Oct 2025 10:17:56 +0000 Subject: [PATCH 1/4] Fix get_config to persist new Config rows and avoid DetachedInstanceError Two bugs were present in the get_config method: 1. Missing session.commit() after creating new Config object - New config rows were added to session but never committed - This caused empty database on fresh installations - Result: synack_domain and other config values were None 2. Session closed before accessing attributes - getattr() called after session.close() - SQLAlchemy tried to lazy-load attributes from closed session - This raised DetachedInstanceError on attribute access The fix: - Add session.commit() after session.add(config) - Store the return value before closing session - This ensures the config row is persisted and attributes are loaded This is a pre-existing bug that affects all fresh database installations. --- src/synack/plugins/db.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/synack/plugins/db.py b/src/synack/plugins/db.py index 852c2a9..00866f0 100644 --- a/src/synack/plugins/db.py +++ b/src/synack/plugins/db.py @@ -428,8 +428,10 @@ def get_config(self, name=None): if not config: config = Config() session.add(config) + session.commit() + ret = getattr(config, name) if name else config session.close() - return getattr(config, name) if name else config + return ret @property def http_proxy(self): From 11a8130a21ce6b0e30854ce113209bac94d7186c Mon Sep 17 00:00:00 2001 From: Derb237 <198724448+Derb237@users.noreply.github.com> Date: Thu, 6 Nov 2025 03:17:14 +0000 Subject: [PATCH 2/4] Fix spurious debug logging in api.py Fixed three bugs in the request error handling: 1. Line 155: Fixed typo where 'fail_reason' was set instead of 'reason_failed' for HTTP 412 status codes 2. Line 170: Moved unconditional debug log into a conditional that only logs terminal failures (400, 401, 403, 412) 3. Lines 147-151: Fixed each status code to set its own specific error message instead of reusing a shared 'Request failed' message: - 400: "Bad request" - 401: "Unauthorized" - 403: "Logged out" - 412: "Mission already claimed" Previously, when debug mode was enabled, every HTTP request would log 'MISSION ALREADY CLAIMED' regardless of the actual status code or context. Additionally, 401 errors were being logged with misleading messages because the reason_failed variable was being reused across different status codes. Bug introduced in commit c406ca9d (March 2, 2025) --- src/synack/plugins/api.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/synack/plugins/api.py b/src/synack/plugins/api.py index 1cc70c1..5dd6b11 100644 --- a/src/synack/plugins/api.py +++ b/src/synack/plugins/api.py @@ -144,13 +144,15 @@ def request(self, method, path, attempts=0, **kwargs): f"\n\tData: {data}" + f"\n\tContent: {res.content}") - reason_failed = 'Request failed' - if res.status_code in [ 400, 401 ]: - reason_failed = 'Request failed' + reason_failed = None + if res.status_code == 400: + reason_failed = 'Bad request' + elif res.status_code == 401: + reason_failed = 'Unauthorized' elif res.status_code == 403: reason_failed = 'Logged out' elif res.status_code == 412: - fail_reason = 'Mission already claimed' + reason_failed = 'Mission already claimed' elif res.status_code == 429: self._debug.log('Too many requests', f'({res.status_code} - {res.reason}) {res.url}') if attempts < 5: @@ -165,6 +167,8 @@ def request(self, method, path, attempts=0, **kwargs): attempts += 1 return self.request(method, path, attempts, **kwargs) - self._debug.log('Mission already claimed', f'({res.status_code} - {res.reason}) {res.url}') + # Log terminal failures (non-retryable errors) + if res.status_code in [400, 401, 403, 412]: + self._debug.log(reason_failed, f'({res.status_code} - {res.reason}) {res.url}') return res From da2f260b8393c1b4a64cded81390c1ed9d865877 Mon Sep 17 00:00:00 2001 From: Derb237 <198724448+Derb237@users.noreply.github.com> Date: Thu, 6 Nov 2025 04:52:15 +0000 Subject: [PATCH 3/4] Fix authentication error handling to prevent account lockout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove infinite retry loop on invalid credentials (400) - Clear stored email/password when authentication fails - Add explicit error handling for account locked (423) - Make 400 and 423 non-retryable in API layer to prevent rapid-fire retries - Raise clear error messages for both authentication failure scenarios This prevents the previous behavior where entering wrong credentials would trigger rapid retries that locked the account. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/synack/plugins/api.py | 4 +++- src/synack/plugins/auth.py | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/synack/plugins/api.py b/src/synack/plugins/api.py index 5dd6b11..19f4ef3 100644 --- a/src/synack/plugins/api.py +++ b/src/synack/plugins/api.py @@ -153,6 +153,8 @@ def request(self, method, path, attempts=0, **kwargs): reason_failed = 'Logged out' elif res.status_code == 412: reason_failed = 'Mission already claimed' + elif res.status_code == 423: + reason_failed = 'Locked' elif res.status_code == 429: self._debug.log('Too many requests', f'({res.status_code} - {res.reason}) {res.url}') if attempts < 5: @@ -168,7 +170,7 @@ def request(self, method, path, attempts=0, **kwargs): return self.request(method, path, attempts, **kwargs) # Log terminal failures (non-retryable errors) - if res.status_code in [400, 401, 403, 412]: + if res.status_code in [400, 401, 403, 412, 423]: self._debug.log(reason_failed, f'({res.status_code} - {res.reason}) {res.url}') return res diff --git a/src/synack/plugins/auth.py b/src/synack/plugins/auth.py index 5f569c2..1195ce8 100644 --- a/src/synack/plugins/auth.py +++ b/src/synack/plugins/auth.py @@ -62,9 +62,11 @@ def get_authentication_response(self, csrf): if res.status_code == 200: return res.json() elif res.status_code == 400: - csrf = self.get_login_csrf() - if csrf: - return self.get_authentication_response(csrf) + self._db.email = '' + self._db.password = '' + raise ValueError("Invalid email or password. Please run the script again to re-enter credentials.") + elif res.status_code == 423: + raise ValueError("Your account has been locked due to too many failed login attempts. Please wait and try again later.") def get_login_csrf(self): """Get the CSRF Token from the login page""" From c80c175c756bf983e570865276600d775d088aeb Mon Sep 17 00:00:00 2001 From: Derb237 <198724448+Derb237@users.noreply.github.com> Date: Thu, 6 Nov 2025 07:22:18 +0000 Subject: [PATCH 4/4] Add Duo Push auto-approval and HOTP hex conversion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements three-priority Duo MFA authentication system: 1. OTP (highest priority) - Auto-generates HOTP codes from secret 2. Auto-approval - Uses device credentials to approve pushes via Duo API 3. Manual push (fallback) - Traditional approve-on-phone flow Database changes: - Add duo_push_akey, duo_push_pkey, duo_push_host, duo_push_rsa_key_path columns - Add duo_device column to persist user's selected device - Migration: 20522d39dc63_add_duo_push_method Duo Push auto-approval: - Integrate with Duo device API using RSA-SHA512 signed requests - Load device credentials from database and RSA key from file - Poll for pending push notifications and auto-approve - Hard fail if auto-approval is configured but broken (prevents hanging) - Auto-correct duo_device when credentials don't match selected device HOTP hex secret auto-conversion: - Auto-detect 32-char hex format (from synackDUO's hotp_secret) - Convert by treating hex string as UTF-8, then base32 encode - Based on duo-hotp reference implementation - Accepts both hex (hotp_secret) and base32 (otpauth://) formats 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- docs/src/usage/index.md | 8 +- docs/src/usage/main-components/state.md | 6 +- docs/src/usage/plugins/duo.md | 71 +++++ src/synack/_state.py | 60 ++++ .../6f542023f57e_add_duo_push_method.py | 38 +++ src/synack/db/models/config.py | 7 + src/synack/plugins/db.py | 79 +++++- src/synack/plugins/duo.py | 267 +++++++++++++++++- 8 files changed, 515 insertions(+), 21 deletions(-) create mode 100644 src/synack/db/alembic/versions/6f542023f57e_add_duo_push_method.py diff --git a/docs/src/usage/index.md b/docs/src/usage/index.md index 1031492..f343803 100644 --- a/docs/src/usage/index.md +++ b/docs/src/usage/index.md @@ -17,12 +17,8 @@ With that in mind, I would highly recommend you become familiar with the [Plugin ## Authentication The first time you try to do anything which requires authentication, you will be automatically prompted for your credentials. -This prompt will expect the `Synack Email` and `Synack Password`, which are fairly self explanitory, but it also asks for the `Synack OTP Secret`. +This prompt will expect the `Synack Email` and `Synack Password`, which are fairly self explanatory. -The `Synack OTP Secret` is NOT the 8 digit code you pull out of Authy. -Instead, it is a string that you must extract from Authy via a method similar to the one found [here](https://gist.github.com/gboudreau/94bb0c11a6209c82418d01a59d958c93). - -Use the above instructions at your own discression. -I TAKE NO RESPONSIBILITY IF SOMETHING BAD HAPPENS AS A RESULT. +For Duo MFA setup options, see the [Duo plugin documentation](./plugins/duo.md). Once you complete these steps, your credentials are stored in a SQLiteDB at `~/.config/synack/synackapi.db`. diff --git a/docs/src/usage/main-components/state.md b/docs/src/usage/main-components/state.md index 21df449..4f8fc90 100644 --- a/docs/src/usage/main-components/state.md +++ b/docs/src/usage/main-components/state.md @@ -55,12 +55,16 @@ In the event that one of the State variables is set and is **not** constantly at | api_token | str | This is the Synack Access Token used to authenticate requests | config_dir | pathlib.Path | The location of the Database and Login script | debug | bool | Used to show/hide debugging messages +| duo_push_akey | str | Duo device activation key for push auto-approval +| duo_push_host | str | Duo API hostname for push auto-approval +| duo_push_pkey | str | Duo device private key for push auto-approval +| duo_push_rsa_key_path | str | Path to RSA private key for signing Duo API requests | email | str | Your email address used to log into Synack | http_proxy | str | A Web Proxy (Burp, etc.) to intercept requests | https_proxy | str | A Web Proxy (Burp, etc.) to intercept requests | login | bool | Used to enable/disable a check of the api_token upon creation of the Handler | notifications_token | str | Token used for authentication when dealing with Synack Notifications -| otp_secret | str | OTP Secret held by Authy. NOT an OTP. For more information, read the Usage page +| otp_secret | str | OTP Secret held by Duo Mobile. NOT an OTP. For more information, read the Usage page | password | str | Your Synack Password | session | requests.Session | Tracks cookies and headers across various functions | template_dir | pathlib.Path | The location of your Mission Templates diff --git a/docs/src/usage/plugins/duo.md b/docs/src/usage/plugins/duo.md index 0347edd..990f4ef 100644 --- a/docs/src/usage/plugins/duo.md +++ b/docs/src/usage/plugins/duo.md @@ -1,5 +1,58 @@ # Duo +## Duo MFA Options + +When prompted during authentication, you can choose from three options: + +**Option 1: Manual Push Approval (Simplest)** +- Press Enter when prompted for OTP Secret +- Approve push notifications on your phone each time the token is expired +- No additional setup required + +**Option 2: Automated OTP (Preferred)** +- Enter your OTP Secret when prompted (accepts both hex and base32 formats) +- Automatically generates OTP codes using a counter (saved in the database) +- Extract the `hotp_secret` from Duo Mobile using [synackDUO](https://github.com/dinosn/synackDUO) (see `response.json`) +- **Note:** This is NOT the 8-digit codes from Duo Mobile, but the HOTP secret key + +**Option 3: Automated Duo Push** +- Uses Duo credentials to auto-approve push requests +- Can also approve push requests using duo.approve_pending_push(timeout) +- Extract credentials using [synackDUO](https://github.com/dinosn/synackDUO) (see `response.json`) (see below) + + +**Disclaimer:** Use the above instructions at your own discretion. I TAKE NO RESPONSIBILITY IF SOMETHING BAD HAPPENS AS A RESULT. + +## Duo Push Auto-Approval Setup + +The Duo plugin supports push notification approval using device credentials. + +### Prerequisites + +You must extract and configure four credentials from Duo Mobile: + +| Credential | Description | Example +| --- | --- | --- +| `duo_push_akey` | Device activation key | `DAXXXXXXXXXXXXXXXXXXXX` +| `duo_push_pkey` | Device private key | `DPXXXXXXXXXXXXXXXXXXXX` +| `duo_push_host` | Duo API hostname | `api-xxxxxxxx.duosecurity.com` +| `duo_push_rsa_key_path` | Path to RSA private key | `~/.config/synack/duo/key.pem` + +### Configuration + +Set credentials in the database: +PP +```python +import synack + +h = synack.Handler(login=False) + +h.db.set_config('duo_push_akey', 'DAXXXXXXXXXXXXXXXXXX') +h.db.set_config('duo_push_pkey', 'DPXXXXXXXXXXXXXXXXXX') +h.db.set_config('duo_push_host', 'api-xxxxxxxx.duosecurity.com') +h.db.set_config('duo_push_rsa_key_path', 'synackDUO/key.pem') +``` + ## duo.get_grant_token(auth_url) > Handles Duo Security MFA stages and returns the grant_token used to finish logging into Synack @@ -13,3 +66,21 @@ >> >>> h.duo.get_grant_token('https:///...duosecurity.com/...') >> 'Y8....6g' >> ``` + +## duo.approve_pending_push(timeout) + +> Wait for and approve a single Duo push notification +> +> Polls Duo's device API for pending push notifications and automatically approves the first one found. Useful for automated workflows that need to handle Duo MFA. +> +> | Argument | Type | Default | Description +> | --- | --- | --- | --- +> | `timeout` | int | 30 | Maximum seconds to wait for a push notification +> +> Returns `True` if a push was approved, `False` if timeout or error occurred. +> +>> Examples +>> ```python3 +>> >>> h.duo.approve_pending_push(timeout=60) +>> True +>> ``` diff --git a/src/synack/_state.py b/src/synack/_state.py index 47348c2..ff09c87 100644 --- a/src/synack/_state.py +++ b/src/synack/_state.py @@ -41,6 +41,11 @@ def __init__(self): self._use_proxies = None self._use_scratchspace = None self._user_id = None + self._duo_push_akey = None + self._duo_push_pkey = None + self._duo_push_host = None + self._duo_push_rsa_key_path = None + self._duo_device = None @property def smtp_email_from(self) -> str: @@ -357,3 +362,58 @@ def user_id(self) -> str: @user_id.setter def user_id(self, value: str) -> None: self._user_id = value + + @property + def duo_push_akey(self) -> str: + ret = self._duo_push_akey + if ret is None: + ret = self._db.duo_push_akey + return ret + + @duo_push_akey.setter + def duo_push_akey(self, value: str) -> None: + self._duo_push_akey = value + + @property + def duo_push_pkey(self) -> str: + ret = self._duo_push_pkey + if ret is None: + ret = self._db.duo_push_pkey + return ret + + @duo_push_pkey.setter + def duo_push_pkey(self, value: str) -> None: + self._duo_push_pkey = value + + @property + def duo_push_host(self) -> str: + ret = self._duo_push_host + if ret is None: + ret = self._db.duo_push_host + return ret + + @duo_push_host.setter + def duo_push_host(self, value: str) -> None: + self._duo_push_host = value + + @property + def duo_push_rsa_key_path(self) -> str: + ret = self._duo_push_rsa_key_path + if ret is None: + ret = self._db.duo_push_rsa_key_path + return ret + + @duo_push_rsa_key_path.setter + def duo_push_rsa_key_path(self, value: str) -> None: + self._duo_push_rsa_key_path = value + + @property + def duo_device(self) -> str: + ret = self._duo_device + if ret is None: + ret = self._db.duo_device + return ret + + @duo_device.setter + def duo_device(self, value: str) -> None: + self._duo_device = value diff --git a/src/synack/db/alembic/versions/6f542023f57e_add_duo_push_method.py b/src/synack/db/alembic/versions/6f542023f57e_add_duo_push_method.py new file mode 100644 index 0000000..0fdde9a --- /dev/null +++ b/src/synack/db/alembic/versions/6f542023f57e_add_duo_push_method.py @@ -0,0 +1,38 @@ +"""add duo push method + +Revision ID: 6f542023f57e +Revises: 1434aa7ed47c +Create Date: 2025-11-06 08:23:42.181054 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '6f542023f57e' +down_revision = '1434aa7ed47c' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('config') as batch_op: + batch_op.add_column(sa.Column('duo_push_akey', sa.VARCHAR(length=200), nullable=True)) + batch_op.add_column(sa.Column('duo_push_pkey', sa.VARCHAR(length=200), nullable=True)) + batch_op.add_column(sa.Column('duo_push_host', sa.VARCHAR(length=100), nullable=True)) + batch_op.add_column(sa.Column('duo_push_rsa_key_path', sa.VARCHAR(length=250), nullable=True)) + batch_op.add_column(sa.Column('duo_device', sa.VARCHAR(length=50), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('config') as batch_op: + batch_op.drop_column('duo_device') + batch_op.drop_column('duo_push_rsa_key_path') + batch_op.drop_column('duo_push_host') + batch_op.drop_column('duo_push_pkey') + batch_op.drop_column('duo_push_akey') + # ### end Alembic commands ### diff --git a/src/synack/db/models/config.py b/src/synack/db/models/config.py index 943f1da..216b467 100644 --- a/src/synack/db/models/config.py +++ b/src/synack/db/models/config.py @@ -38,3 +38,10 @@ class Config(Base): user_id = sa.Column(sa.VARCHAR(20), default='') use_proxies = sa.Column(sa.BOOLEAN, default=False) use_scratchspace = sa.Column(sa.BOOLEAN, default=False) + duo_push_akey = sa.Column(sa.VARCHAR(200), default='') + duo_push_pkey = sa.Column(sa.VARCHAR(200), default='') + duo_push_host = sa.Column(sa.VARCHAR(100), default='') + duo_push_rsa_key_path = sa.Column( + sa.VARCHAR(250), default='' + ) + duo_device = sa.Column(sa.VARCHAR(50), default='') diff --git a/src/synack/plugins/db.py b/src/synack/plugins/db.py index 00866f0..21a828d 100644 --- a/src/synack/plugins/db.py +++ b/src/synack/plugins/db.py @@ -480,14 +480,49 @@ def otp_count(self, value): def otp_secret(self): ret = self.get_config('otp_secret') if not ret: - ret = input('Synack OTP Secret: ') - self.otp_secret = ret + # Skip prompt if automated push credentials are already configured + if self.duo_push_akey and self.duo_push_pkey and self.duo_push_host: + ret = '' + self.otp_secret = ret + # Skip prompt if user has already selected a device for manual push + elif self.duo_device: + ret = '' + else: + print("\nDuo MFA Authentication Setup:") + print( + "1. Press Enter to use Duo Push notifications " + "(you'll approve on your phone)" + ) + print("2. OR enter your Duo OTP Secret for automated passcode generation") + print(" (Accepts hex (hotp_secret) or base32 (otpauth://) format)") + ret = input('\nDuo OTP Secret (or press Enter for push): ').strip() + self.otp_secret = ret if ret else '' return ret @otp_secret.setter def otp_secret(self, value): + # Auto-detect and convert hex format to base32 + # Duo's hotp_secret is a hex string, but needs to be treated as UTF-8 + # not as hex bytes (based on duo-hotp reference implementation) + if value and self._is_hex_secret(value): + import base64 + # Encode the hex string as UTF-8 bytes, then base32 + value = base64.b32encode(value.encode('utf-8')).decode('ascii').rstrip('=') self.set_config('otp_secret', value) + def _is_hex_secret(self, value): + """Check if the secret appears to be in hex format (not base32)""" + # Hex: 32 chars using only 0-9, a-f + # Base32: variable length using A-Z, 2-7 + if len(value) != 32: + return False + try: + # If it can be decoded as hex, it's hex + bytes.fromhex(value) + return True + except ValueError: + return False + @property def password(self): ret = self.get_config('password') @@ -500,6 +535,46 @@ def password(self): def password(self, value): self.set_config('password', value) + @property + def duo_push_akey(self): + return self.get_config('duo_push_akey') + + @duo_push_akey.setter + def duo_push_akey(self, value): + self.set_config('duo_push_akey', value) + + @property + def duo_push_pkey(self): + return self.get_config('duo_push_pkey') + + @duo_push_pkey.setter + def duo_push_pkey(self, value): + self.set_config('duo_push_pkey', value) + + @property + def duo_push_host(self): + return self.get_config('duo_push_host') + + @duo_push_host.setter + def duo_push_host(self, value): + self.set_config('duo_push_host', value) + + @property + def duo_push_rsa_key_path(self): + return self.get_config('duo_push_rsa_key_path') + + @duo_push_rsa_key_path.setter + def duo_push_rsa_key_path(self, value): + self.set_config('duo_push_rsa_key_path', value) + + @property + def duo_device(self): + return self.get_config('duo_device') + + @duo_device.setter + def duo_device(self, value): + self.set_config('duo_device', value) + @property def ports(self): session = self.Session() diff --git a/src/synack/plugins/duo.py b/src/synack/plugins/duo.py index be3ff23..2ddcb4a 100644 --- a/src/synack/plugins/duo.py +++ b/src/synack/plugins/duo.py @@ -9,7 +9,14 @@ import json import pyotp import re +import requests import time +from datetime import UTC, datetime +from pathlib import Path +from urllib.parse import urlencode +from Crypto.Hash import SHA512 +from Crypto.PublicKey import RSA +from Crypto.Signature import pkcs1_15 class Duo(Plugin): @@ -33,6 +40,7 @@ def __init__(self, *args, **kwargs): self._sid = None self._txid = None self._xsrf = None + self._pubkey = None def _build_headers(self, overrides=None): headers = { @@ -56,7 +64,35 @@ def get_grant_token(self, auth_url): self._set_session_variables() # Yes, this needs to be called twice... self._get_txid() if self._txid: - self._get_status() + # Priority 1: OTP (if configured) + if self._state.otp_secret: + # OTP passcode already sent in _get_txid(), just poll for status + self._get_status() + # Priority 2: Auto-approval (if configured) - HARD FAIL if broken + elif self.is_configured(): + if not self.load_rsa_key(): + raise RuntimeError( + "Duo Push auto-approval is enabled but RSA key failed to load" + ) + print("Auto-approving Duo push notification...") + if self._state.debug: + print(f"Using device: {self._device}") + print(f"Configured duo_device: {self._state.duo_device}") + if self._device != self._state.duo_device: + print(f"WARNING: Push sent to {self._device} but credentials are for {self._state.duo_device}") + # Wait 2 seconds before polling to give Duo time to register the push + time.sleep(2) + if not self.approve_pending_push(timeout=25): + raise RuntimeError( + "Duo Push auto-approval failed - check credentials or " + "disable auto-approval. Ensure duo_device matches the device " + "with extracted credentials." + ) + self._get_status() + # Priority 3: Manual push (fallback) + else: + print("Waiting for manual Duo push approval on your device...") + self._get_status() if self._status == 'SUCCESS': self._get_oidc_exit() if self._progress_token: @@ -103,16 +139,69 @@ def _get_mfa_details(self): 'sid': self._sid } res = self._api.request('GET', f'{self._base_url}/frame/v4/auth/prompt/data', headers=headers, query=query) + if res.status_code == 200: - for method in res.json().get('response', {}).get('auth_method_order', []): - if method.get('factor', '') == 'Duo Push': - device_key = method.get('deviceKey', '') - break + response_json = res.json() + response_data = response_json.get('response', {}) + phones = response_data.get('phones', []) + + # If auto-approval credentials are configured, find the matching device + if self.is_configured(): + # Match device by pkey + pkey = self._state.duo_push_pkey + for phone in phones: + if phone.get('key', '') == pkey: + self._device = phone.get('index', '') + self._factor = 'Duo Push' + # Update stored device if it doesn't match + if self._state.duo_device != self._device: + print(f"Auto-correcting duo_device from {self._state.duo_device} to {self._device}") + self._db.duo_device = self._device + return + # If no match found, credentials are for wrong account + print(f"WARNING: duo_push_pkey {pkey} not found in available devices") + print("Falling back to manual device selection") + + # Check if we have a stored device preference + if self._state.duo_device: + # Use the stored device + for phone in phones: + if phone.get('index', '') == self._state.duo_device: + self._device = phone.get('index', '') + self._factor = 'Duo Push' + return + # If stored device not found, fall through to prompt - for phone in res.json().get('response', {}).get('phones', []): - if phone.get('key', '') == device_key: - self._device = phone.get('index', '') - self._factor = 'Duo Push' + # Prompt user to select a device + if phones: + print("\nAvailable Duo devices:") + for i, phone in enumerate(phones, 1): + print(f"{i}. {phone.get('name', 'Unknown')} ({phone.get('index', '')})") + + while True: + try: + choice = input("\nSelect device number (or press Enter for first device): ").strip() + if not choice: + selected_phone = phones[0] + break + choice_num = int(choice) + if 1 <= choice_num <= len(phones): + selected_phone = phones[choice_num - 1] + break + print(f"Please enter a number between 1 and {len(phones)}") + except ValueError: + print("Please enter a valid number") + + self._device = selected_phone.get('index', '') + self._factor = 'Duo Push' + self._db.duo_device = self._device + return + + if not self._device or not self._factor: + raise ValueError( + f'Failed to determine MFA device/factor from Duo API. ' + f'HTTP {res.status_code}, device={self._device}, factor={self._factor}' + ) def _get_oidc_exit(self): headers = { @@ -204,7 +293,8 @@ def _get_status(self): 'txid': self._txid, 'sid': self._sid } - for i in range(5): + # Increase polling attempts from 5 to 12 (1 minute total with 5s intervals) + for i in range(12): res = self._api.request('POST', f'{self._base_url}/frame/v4/status', headers=headers, data=data) if res.status_code == 200: status_enum = res.json().get('response', {}).get('status_enum', -1) @@ -223,8 +313,9 @@ def _get_status(self): break elif status_enum == 13: # Awaiting Push Notification pass - elif status_enum == 15: # Push Notification MFA Blocked - break + elif status_enum == 15: # Push sent, waiting for approval + # Continue polling for both auto-approval and manual approval + pass elif status_enum == 44: # Prior Code self._db.otp_count += 5 break @@ -297,3 +388,155 @@ def _set_session_variables(self): res = self._api.request('POST', self._referrer, headers=headers, data=self._session_vars) if res.status_code == 200: self._referrer = res.url + + # Duo Push Auto-Approval Methods + + def is_configured(self): + """Check if Duo push auto-approval credentials are configured""" + return ( + self._state.duo_push_akey and + self._state.duo_push_pkey and + self._state.duo_push_host + ) + + def load_rsa_key(self): + """Load RSA key from configured path""" + if not self.is_configured(): + return False + + key_path = Path(self._state.duo_push_rsa_key_path).expanduser() + if not key_path.exists(): + print(f"Duo RSA key not found: {key_path}") + return False + + try: + with open(key_path, 'rb') as f: + self._pubkey = RSA.import_key(f.read()) + return True + except Exception as e: + print(f"Failed to load Duo RSA key: {e}") + return False + + def approve_pending_push(self, timeout=30): + """Wait for and approve a single Duo push notification""" + if not self.is_configured(): + return False + + if not self._pubkey and not self.load_rsa_key(): + print("Cannot approve push: RSA key not available") + return False + + print("Polling for Duo push notification...") + start_time = time.monotonic() + poll_interval = 2 # Poll every 2 seconds + + while time.monotonic() - start_time < timeout: + try: + # Poll for transactions + transactions = self._get_transactions() + if self._state.debug: + print(f"Transactions response: {transactions}") + response_data = transactions.get('response', {}) + pending = response_data.get('transactions', []) + current_time = response_data.get('current_time', 0) + + if self._state.debug: + print(f"Found {len(pending)} pending transactions") + + if pending: + for tx in pending: + tx_id = tx.get('urgid') + expiration = tx.get('expiration', 0) + + if self._state.debug: + print(f"Transaction: {tx}") + + # Skip expired transactions + if expiration and current_time and expiration <= current_time: + if self._state.debug: + print(f"Skipping expired transaction {tx_id}") + continue + + if tx_id: + tx_summary = tx.get('summary', 'N/A') + print(f"Approving Duo push {tx_id[:12]}... ({tx_summary})") + response = self._reply_transaction(tx_id, 'approve') + if response.get('stat') == 'OK': + print("Duo push approved successfully") + return True + else: + print(f"Push approval returned: {response}") + + time.sleep(poll_interval) + + except Exception as e: + print(f"Error during Duo push approval: {e}") + return False + + return False + + def _generate_signature(self, method, path, time_str, data): + """Generate RSA signature for Duo API request""" + encoded_data = urlencode(sorted(data.items())) if data else "" + message_parts = [ + time_str, + method.upper(), + self._state.duo_push_host.lower(), + path, + encoded_data, + ] + message = "\n".join(message_parts).encode('ascii') + h = SHA512.new(message) + signature = pkcs1_15.new(self._pubkey).sign(h) + auth_string = f"{self._state.duo_push_pkey}:{base64.b64encode(signature).decode('ascii')}" + return "Basic " + base64.b64encode(auth_string.encode('ascii')).decode('ascii') + + def _make_request(self, method, path, data): + """Make authenticated request to Duo device API""" + dt = datetime.now(UTC) + # Format as RFC 2822 date for HTTP header (e.g., "Mon, 04 Nov 2025 12:34:56 GMT") + time_str = dt.strftime('%a, %d %b %Y %H:%M:%S GMT') + signature = self._generate_signature(method, path, time_str, data) + + url = f"https://{self._state.duo_push_host}{path}" + headers = { + 'Authorization': signature, + 'x-duo-date': time_str, + 'Host': self._state.duo_push_host, + 'Content-Type': 'application/x-www-form-urlencoded', + } + + try: + if method.upper() == 'GET': + r = requests.get(url, params=data, headers=headers, timeout=10) + else: + r = requests.post(url, data=data, headers=headers, timeout=10) + + r.raise_for_status() + return r.json() + except Exception as e: + print(f"Duo API request failed: {e}") + raise + + def _get_transactions(self): + """Get pending Duo push transactions""" + path = "/push/v2/device/transactions" + params = { + 'akey': self._state.duo_push_akey, + 'fips_status': '1', + 'hsm_status': 'true', + 'pkpush': 'rsa-sha512', + } + return self._make_request('GET', path, params) + + def _reply_transaction(self, transaction_id, answer): + """Reply to a Duo push transaction (approve/deny)""" + path = f"/push/v2/device/transactions/{transaction_id}" + data = { + 'akey': self._state.duo_push_akey, + 'answer': answer, + 'fips_status': '1', + 'hsm_status': 'true', + 'pkpush': 'rsa-sha512', + } + return self._make_request('POST', path, data)