Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to login via sherriff id verification #510

Merged
merged 2 commits into from
Dec 14, 2024

Conversation

ravi-bharadwaj
Copy link
Contributor

No description provided.

@ravi-bharadwaj ravi-bharadwaj changed the title changes to send proper error code back to the user Changes to login via sherriff id verification Dec 6, 2024
Copy link

@Syerramsetti915 Syerramsetti915 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working as expected

@mw66
Copy link

mw66 commented Dec 6, 2024

@jmfernandes this is an important fix, can you take a look and merge it? Thanks.

raise Exception("Challenge not validated")
raise Exception("Id not returned in user-machine call")

def _get_sherrif_challenge(token_id:str):
Copy link

@tamablevirus tamablevirus Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _get_sheriff_challenge used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a new function used only in one place

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hes referring to line 241 function, not the validation handler, of which he is correct, the function starting at line 241 is not called anywhere and then the logic calls data value without it being in the args nor subscripted to anything. Just trying to help, the rest of the script seems pretty good

'flow': 'suv',
'input':{'workflow_id': workflow_id}
}
data = request_post(url=url, payload=payload,json=True )

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT extra space at end of request_post params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, I had to disable autoformatting on my end as it was introducing lot of changes on PR.

@Adelantado
Copy link

Thanks for all the time and effort, it is very much appreciated, I wish I could help but this is way above my head.
Just to let you know that fix did not work for me, issue persist and that for an unknown and unexpected reason a copy and paste of the code results in an undefined "data" .... unsure if this the reason why it does not work for me.

Data not defined

@bfaircloth1
Copy link

bfaircloth1 commented Dec 7, 2024

Your hard work and devotion are greatly appreciated. I am receiving another KeyError - status. My Robinhood account is a newer account, opened in March 2024. I'm fairly new to Python so there's not much hope for me helping out. You guys rock though!
Picture1

UPDATE: I'm getting the MFA code sent to me, but the script isn't asking me for it. So, I put a breakpoint at this line,
challenge_payload = {
'response': mfa_code
}
and manually entered the mfa_code that I received in the debugger. This worked and I was logged in after this.

I'm a noob at Python and woefully unqualified to be posting anything here, but here are modifications I made to the _validate_sherrif_id method that worked for me (Python 3.12.7 and Spyder 6.0.1):

def _validate_sherrif_id(device_token:str, workflow_id:str, mfa_code:str=None):
  url = "https://api.robinhood.com/pathfinder/user_machine/"
  payload = {
      'device_id': device_token,
      'flow': 'suv',
      'input': {'workflow_id': workflow_id}
  }
  data = request_post(url=url, payload=payload, json=True)
  if "id" in data:
      inquiries_url = f"https://api.robinhood.com/pathfinder/inquiries/{data['id']}/user_view/"
      res = request_get(inquiries_url)
      challenge_id = res['type_context']["context"]["sheriff_challenge"]["id"]
      challenge_url = f"https://api.robinhood.com/challenge/{challenge_id}/respond/"

      if mfa_code is None:
          # Add manual input for MFA code
          mfa_code = input("Please enter your MFA code: ")

      challenge_payload = {
          'response': mfa_code
      }
      challenge_response = request_post(url=challenge_url, payload=challenge_payload, json=True)

      if challenge_response["status"] == "validated":
          inquiries_payload = {"sequence": 0, "user_input": {"status": "continue"}}
          inquiries_response = request_post(url=inquiries_url, payload=inquiries_payload, json=True)
          if inquiries_response["type_context"]["result"] == "workflow_status_approved":
              return
          else:
              raise Exception("Workflow status not approved")
      else:
          raise Exception("Challenge not validated")
  else:
      raise Exception("ID not returned in user-machine call")

@MaxxRK
Copy link

MaxxRK commented Dec 7, 2024

This fixed the login error for me

@Gates8911
Copy link

Gates8911 commented Dec 8, 2024

For anyone else still stuck I found a work around after using the new module that included the extra validation (but still wasnt allowing requests, would return normal like i was logged in but I was not actually logged in, always failed at first request logic). I had to set up 3rd party authentication via the robinhood app, I used google auth app, but I dont think what you use for this matters, as long as the app will generate the totp_secret (random 6 digit code periodically generated and can be requested once you set up the robinhood app to auth using said app. After this I put any sensitive requests into my user environment variables via "edit the system environment variables". After all of this is adjusted, the login is as follows: (I realize to many of you this message is unnecessary, but i wanted to contribute any way i can, had no idea there were this many people trying to keep this module going, greatly appreciate you, and if anyone is interested in indicators and strategies 4 crypto & stocks I have any indicator you could possibly name + some experimentals in testing phase, some great performing strategies that are optimized for dry markets, so they typically dont need much volitility to perform well, and finally I have 3 machine learning models used for predictions (test phase, showing promise, gpu required to run them) u can contact me at mgates8900@gmail.com if any of this interests anyone, but i digress, now onto the work around for the api update):
totp_secret = os.environ['mfa_code'] # set with code you used when you set up 3rd party app to communicate with RobinH
totp = pyotp.TOTP(totp_secret).now()
username = os.environ['your_username']
password = os.environ['your_password']
login_data = robinhood.login(username, password, expiresIn=86400, scope='internal', store_session=True, mfa_code=totp, pickle_name="Name_file_optional")
rs.update_session("access_token", login_data["access_token"]) # manually updates the session instead of relying on auth func
rs.update_session("refresh_token", login_data.get("refresh_token")) #same, happy coding gentlemen :)

@Christopher-C-Robinson
Copy link

This works for me. Will be using this branch until merged in.

@ravi-bharadwaj
Copy link
Contributor Author

@jmfernandes let us know if you have any concerns for merging the PR.

@Two20Two21
Copy link

Thanks everyone for getting this resolved.

Do we understand the root cause? Did the API change?

@Gates8911
Copy link

@Two20Two21 From what I have gathered the root cause was they updated the API to require 2 factor authentication, for example i couldnt get the new authentication script to work until i used a 3rd party app to verify login, whereas before i had device approvals as my security method, and that used to work just fine, it would prompt me to verify via mobile phone when logging the bot in, now if you try this it will still prompt you to verfy new device login but the script will get a key error, so they added an extra step to their security, as for all the details someone else would have to comment that has more experience with that data.

@jmfernandes jmfernandes merged commit 45dd9ed into jmfernandes:master Dec 14, 2024
@samboy84
Copy link

Iam using R and below is the syntax Iam using, it was working last week but now getting below error. Please provide correct syntax to use. Thanks

RH <- RobinHood ("gmail.com", "password", mfa_code="012601")
Error in RobinHood::api_login(username, password, mfa_code) :
Forbidden (HTTP 403)

@tsikerdekis
Copy link

This is honestly the only thing that worked for me.

Your hard work and devotion are greatly appreciated. I am receiving another KeyError - status. My Robinhood account is a newer account, opened in March 2024. I'm fairly new to Python so there's not much hope for me helping out. You guys rock though! Picture1

UPDATE: I'm getting the MFA code sent to me, but the script isn't asking me for it. So, I put a breakpoint at this line, challenge_payload = { 'response': mfa_code } and manually entered the mfa_code that I received in the debugger. This worked and I was logged in after this.

I'm a noob at Python and woefully unqualified to be posting anything here, but here are modifications I made to the _validate_sherrif_id method that worked for me (Python 3.12.7 and Spyder 6.0.1):

def _validate_sherrif_id(device_token:str, workflow_id:str, mfa_code:str=None):
  url = "https://api.robinhood.com/pathfinder/user_machine/"
  payload = {
      'device_id': device_token,
      'flow': 'suv',
      'input': {'workflow_id': workflow_id}
  }
  data = request_post(url=url, payload=payload, json=True)
  if "id" in data:
      inquiries_url = f"https://api.robinhood.com/pathfinder/inquiries/{data['id']}/user_view/"
      res = request_get(inquiries_url)
      challenge_id = res['type_context']["context"]["sheriff_challenge"]["id"]
      challenge_url = f"https://api.robinhood.com/challenge/{challenge_id}/respond/"

      if mfa_code is None:
          # Add manual input for MFA code
          mfa_code = input("Please enter your MFA code: ")

      challenge_payload = {
          'response': mfa_code
      }
      challenge_response = request_post(url=challenge_url, payload=challenge_payload, json=True)

      if challenge_response["status"] == "validated":
          inquiries_payload = {"sequence": 0, "user_input": {"status": "continue"}}
          inquiries_response = request_post(url=inquiries_url, payload=inquiries_payload, json=True)
          if inquiries_response["type_context"]["result"] == "workflow_status_approved":
              return
          else:
              raise Exception("Workflow status not approved")
      else:
          raise Exception("Challenge not validated")
  else:
      raise Exception("ID not returned in user-machine call")

@regholl2023
Copy link

ok,I'm like totally confused.What is the full correct code we are supposed to be using?

@samboy84
Copy link

ok,I'm like totally confused.What is the full correct code we are supposed to be using?

Load required libraries

library(httr)
library(jsonlite)
library(uuid)
library(otp)

Function to authenticate and handle TOTP

RobinHoodAuth <- function(username, password, totp_secret) {

Generate TOTP code using the provided secret

totp <- TOTP$new(totp_secret)
mfa_code <- totp$now()

Generate a device token for unique identification

device_token <- uuid::UUIDgenerate()

Prepare the login payload

login_payload <- list(
username = username,
password = password,
grant_type = "password",
scope = "internal",
expires_in = 86400,
client_id = "c82SH0WZOsabOXGP2sxqcj34",
device_token = device_token,
mfa_code = mfa_code
)

API URL for authentication

login_url <- "https://api.robinhood.com/oauth2/token/"

Send POST request to authenticate

response <- httr::POST(
login_url,
body = toJSON(login_payload, auto_unbox = TRUE),
encode = "json",
add_headers(
Accept = "application/json",
Content-Type = "application/json"
)
)

Check response status

if (response$status_code == 200) {
login_data <- content(response, as = "parsed")

# Manually update the session with access and refresh tokens
access_token <- login_data$access_token
refresh_token <- login_data$refresh_token

# Return the tokens
session_data <- list(
  access_token = access_token,
  refresh_token = refresh_token,
  device_token = device_token
)
return(session_data)

} else {
stop("Authentication failed. Check your credentials or MFA setup.")
}
}

Example usage:

Replace with your own credentials and TOTP secret

username <- "your_username"
password <- "your_password"
totp_secret <- Sys.getenv("mfa_code") # Ensure the environment variable is set

Authenticate and retrieve session data

session_data <- RobinHoodAuth(username, password, totp_secret)
print(session_data)

@AusHar AusHar mentioned this pull request Jan 7, 2025
@Gates8911
Copy link

Already posted a fix, however i realized after trying a few different security methods that not all of them were working, this script should work regardless of security settings within robinhood itself, if you are prompted via device approval method within app then the code input does not matter, i simply type 0000 however it may also send text message or sms on some accounts in which case the input prompt obviously does matter. Hope this helps someone. Also the "secrets" code generation method is more secure than the "random" package method ive heard.

"""Contains all functions for the purpose of logging in and out to Robinhood."""
import getpass
import os
import pickle
import secrets
import time

from robin_stocks.robinhood.helper import *
from robin_stocks.robinhood.urls import *

def generate_device_token():
    """Generates a cryptographically secure device token."""
    rands = [secrets.randbelow(256) for _ in range(16)]
    hexa = [str(hex(i + 256)).lstrip("0x")[1:] for i in range(256)]
    token = ""
    for i, r in enumerate(rands):
        token += hexa[r]
        if i in [3, 5, 7, 9]:
            token += "-"
    return token

def respond_to_challenge(challenge_id, sms_code):
    """This function will post to the challenge url.
    :param challenge_id: The challenge id.
    :type challenge_id: str
    :param sms_code: The sms code.
    :type sms_code: str
    :returns:  The response from requests.
    """
    url = challenge_url(challenge_id)
    payload = {
        'response': sms_code
    }
    return(request_post(url, payload))

def _validate_sherrif_id(device_token:str, workflow_id:str,mfa_code:str):
    url = "https://api.robinhood.com/pathfinder/user_machine/"
    machine_payload = {
        'device_id': device_token,
        'flow': 'suv',
        'input': {'workflow_id': workflow_id}
    }
    data = request_post(url=url, payload=machine_payload,json=True)
    machine_id = _get_sherrif_challenge(data)
    inquiries_url = f"https://api.robinhood.com/pathfinder/inquiries/{machine_id}/user_view/"
    response = request_get(inquiries_url)
    challenge_id = response["context"]["sheriff_challenge"]["id"] # used to be type_context
    challenge_url = f"https://api.robinhood.com/challenge/{challenge_id}/respond/" 
    challenge_payload = {'response': mfa_code}
    challenge_response = request_post(url=challenge_url, payload=challenge_payload)
    start_time = time.time()
    while time.time() - start_time < 120: # 2 minutes
        time.sleep(5)
        email_text_code = input("Prompt for text or email code (if prompt sent via robinhood app set this to 0000 after verifying):")
        challenge_payload['response'] = email_text_code
        challenge_response = request_post(url=challenge_url, payload=challenge_payload)
        inquiries_payload = {"sequence":0,"user_input":{"status":"continue"}}
        inquiries_response = request_post(url=inquiries_url, payload=inquiries_payload,json=True)
        if inquiries_response["type_context"]["result"] == "workflow_status_approved":
            print("login successful")
            return
        else:
            raise Exception("workflow status not approved")
    raise Exception("Login confirmation timed out. Please try again.")

def login(username=None, password=None, expiresIn=86400, scope='internal', by_sms=True, store_session=True, mfa_code=None, pickle_path="", pickle_name=""):
    """This function will effectively log the user into robinhood by getting an
    authentication token and saving it to the session header. By default, it
    will store the authentication token in a pickle file and load that value
    on subsequent logins.
    :param username: The username for your robinhood account, usually your email.
        Not required if credentials are already cached and valid.
    :type username: Optional[str]
    :param password: The password for your robinhood account. Not required if
        credentials are already cached and valid.
    :type password: Optional[str]
    :param expiresIn: The time until your login session expires. This is in seconds.
    :type expiresIn: Optional[int]
    :param scope: Specifies the scope of the authentication.
    :type scope: Optional[str]
    :param by_sms: Specifies whether to send an email(False) or an sms(True)
    :type by_sms: Optional[boolean]
    :param store_session: Specifies whether to save the log in authorization
        for future log ins.
    :type store_session: Optional[boolean]
    :param mfa_code: MFA token if enabled.
    :type mfa_code: Optional[str]
    :param pickle_path: Allows users to specify the path of the pickle file.
        Accepts both relative and absolute paths.
    :param pickle_name: Allows users to name Pickle token file in order to switch
        between different accounts without having to re-login every time.
    :returns:  A dictionary with log in information. The 'access_token' keyword contains the access token, and the 'detail' keyword \
    contains information on whether the access token was generated or loaded from pickle file.
    """
    device_token = generate_device_token()
    home_dir = os.path.expanduser("~")
    data_dir = os.path.join(home_dir, ".tokens")
    if pickle_path:
        if not os.path.isabs(pickle_path):
            # normalize relative paths
            pickle_path = os.path.normpath(os.path.join(os.getcwd(), pickle_path))
        data_dir = pickle_path
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    creds_file = "robinhood" + pickle_name + ".pickle"
    pickle_path = os.path.join(data_dir, creds_file)
    # Challenge type is used if not logging in with two-factor authentication.
    if by_sms:
        challenge_type = "sms"
    else:
        challenge_type = "email"
    url = login_url()
    login_payload = {
        'client_id': 'c82SH0WZOsabOXGP2sxqcj34FxkvfnWRZBKlBjFS',
        'expires_in': expiresIn,
        'grant_type': 'password',
        'password': password,
        'scope': scope,
        'username': username,
        'challenge_type': challenge_type,
        'device_token': device_token,
        'try_passkeys': False,
        'token_request_path':'/login',
        'create_read_only_secondary_token':True,
    }
    if mfa_code:
        login_payload['mfa_code'] = mfa_code
    # If authentication has been stored in pickle file then load it. Stops login server from being pinged so much.
    if os.path.isfile(pickle_path):
        # If store_session has been set to false then delete the pickle file, otherwise try to load it.
        # Loading pickle file will fail if the acess_token has expired.
        if store_session:
            try:
                with open(pickle_path, 'rb') as f:
                    pickle_data = pickle.load(f)
                    access_token = pickle_data['access_token']
                    token_type = pickle_data['token_type']
                    refresh_token = pickle_data['refresh_token']
                    # Set device_token to be the original device token when first logged in.
                    pickle_device_token = pickle_data['device_token']
                    login_payload['device_token'] = pickle_device_token
                    # Set login status to True in order to try and get account info.
                    set_login_state(True)
                    update_session(
                        'Authorization', '{0} {1}'.format(token_type, access_token))
                    # Try to load account profile to check that authorization token is still valid.
                    res = request_get(
                        positions_url(), 'pagination', {'nonzero': 'true'}, jsonify_data=False)
                    # Raises exception if response code is not 200.
                    res.raise_for_status()
                    return({'access_token': access_token, 'token_type': token_type,
                            'expires_in': expiresIn, 'scope': scope, 'detail': 'logged in using authentication in {0}'.format(creds_file),
                            'backup_code': None, 'refresh_token': refresh_token})
            except:
                print(
                    "ERROR: There was an issue loading pickle file. Authentication may be expired - logging in normally.", file=get_output())
                set_login_state(False)
                update_session('Authorization', None)
        else:
            os.remove(pickle_path)
    # Try to log in normally.
    if not username:
        username = input("Robinhood username: ")
        login_payload['username'] = username
    if not password:
        password = getpass.getpass("Robinhood password: ")
        login_payload['password'] = password
    data = request_post(url, login_payload)
    # Handle case where mfa or challenge is required.
    if data:
        if 'mfa_required' in data:
            mfa_token = input("Please type in the MFA code: ")
            login_payload['mfa_code'] = mfa_token
            res = request_post(url, login_payload, jsonify_data=False)
            while (res.status_code != 200):
                mfa_token = input(
                    "That MFA code was not correct. Please type in another MFA code: ")
                login_payload['mfa_code'] = mfa_token
                res = request_post(url, login_payload, jsonify_data=False)
            data = res.json()
        elif 'challenge' in data:
            challenge_id = data['challenge']['id']
            sms_code = input('Enter Robinhood code for validation: ')
            res = respond_to_challenge(challenge_id, sms_code)
            while 'challenge' in res and res['challenge']['remaining_attempts'] > 0:
                sms_code = input('That code was not correct. {0} tries remaining. Please type in another code: '.format(
                    res['challenge']['remaining_attempts']))
                res = respond_to_challenge(challenge_id, sms_code)
            update_session(
                'X-ROBINHOOD-CHALLENGE-RESPONSE-ID', challenge_id)
            data = request_post(url, login_payload)
        elif 'verification_workflow' in data:
            print("Verification workflow required. Please check your Robinhood app for instructions.")
            workflow_id = data['verification_workflow']['id']
            _validate_sherrif_id(device_token=device_token, workflow_id=workflow_id, mfa_code=mfa_code) 
            data = request_post(url, login_payload)
        # Update Session data with authorization or raise exception with the information present in data.
        if 'access_token' in data:
            token = '{0} {1}'.format(data['token_type'], data['access_token'])
            update_session('Authorization', token)
            set_login_state(True)
            data['detail'] = "logged in with brand new authentication code."
            if store_session:
                with open(pickle_path, 'wb') as f:
                    pickle.dump({'token_type': data['token_type'],
                                 'access_token': data['access_token'],
                                 'refresh_token': data['refresh_token'],
                                 'device_token': login_payload['device_token']}, f)
        
        else:
            if 'detail' in data:
                raise Exception(data['detail'])
            raise Exception(f"Received an error response {data}")
    else:
        raise Exception('Error: Trouble connecting to robinhood API. Check internet connection.')
    return(data)

def _get_sherrif_challenge(data):
    if "id" in data:
        return data["id"]
    raise Exception("Id not returned in user-machine call")

@login_required
def logout():
    """Removes authorization from the session header.
    :returns: None
    """
    set_login_state(False)
    update_session('Authorization', None)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.