Skip to content

Commit

Permalink
Merge pull request #1857 from bassem232/signup
Browse files Browse the repository at this point in the history
Signup
  • Loading branch information
subzeroid authored Apr 13, 2024
2 parents d090587 + 29acb7b commit 4b40759
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 1 deletion.
7 changes: 7 additions & 0 deletions instagrapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
from instagrapi.mixins.insights import InsightsMixin
from instagrapi.mixins.location import LocationMixin
from instagrapi.mixins.media import MediaMixin
from instagrapi.mixins.password import PasswordMixin
from instagrapi.mixins.photo import DownloadPhotoMixin, UploadPhotoMixin
from instagrapi.mixins.private import PrivateRequestMixin
from instagrapi.mixins.public import (ProfilePublicMixin, PublicRequestMixin,
TopSearchesPublicMixin)
from instagrapi.mixins.signup import SignUpMixin
from instagrapi.mixins.multiple_accounts import MultipleAccountsMixin
from instagrapi.mixins.note import NoteMixin
from instagrapi.mixins.notification import NotificationMixin
Expand Down Expand Up @@ -78,6 +84,7 @@ class Client(
CommentMixin,
StoryMixin,
PasswordMixin,
SignUpMixin,
DownloadClipMixin,
UploadClipMixin,
ReelsMixin,
Expand Down
217 changes: 217 additions & 0 deletions instagrapi/mixins/signup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import base64
import random
import time

from datetime import datetime
from uuid import uuid4

from instagrapi.extractors import extract_user_short
from instagrapi.types import UserShort


CHOICE_EMAIL = 1


class SignUpMixin:
waterfall_id = str(uuid4())
adid = str(uuid4())
wait_seconds = 5

def signup(
self,
username: str,
password: str,
email: str,
phone_number: str,
full_name: str = '',
year: int = None,
month: int = None,
day: int = None
) -> UserShort:
self.get_signup_config()
check = self.check_email(email)
assert check.get('valid'), f'Email not valid ({check})'
assert check.get('available'), f'Email not available ({check})'
sent = self.send_verify_email(email)
assert sent.get('email_sent'), 'Email not sent ({sent})'
# send code confirmation
code = ""
for attempt in range(1, 11):
code = self.challenge_code_handler(username, CHOICE_EMAIL)
if code:
break
time.sleep(self.wait_seconds * attempt)
print(
f'Enter code "{code}" for {username} '
f'({attempt} attempts, by {self.wait_seconds} seconds)'
)
signup_code = self\
.check_confirmation_code(email, code)\
.get('signup_code')
retries = 0
kwargs = {
'username': username,
'password': password,
'email': email,
'signup_code': signup_code,
'full_name': full_name,
'year': year,
'month': month,
'day': day
}
while retries < 3:
data = self.accounts_create(**kwargs)
if data.get('message') != 'challenge_required':
break
if self.challenge_flow(data['challenge']):
kwargs.update({
"suggestedUsername": "",
"sn_result": "MLA"
})
retries += 1
return extract_user_short(data['created_user'])

def get_signup_config(self) -> dict:
return self.private_request("consent/get_signup_config/", params={
"guid": self.uuid,
"main_account_selected": False
})

def check_email(self, email) -> dict:
"""Check available (free, not registred) email
"""
return self.private_request("users/check_email/", {
"android_device_id": self.device_id,
"login_nonce_map": "{}",
"login_nonces": "[]",
"email": email,
"qe_id": str(uuid4()),
"waterfall_id": self.waterfall_id
})

def send_verify_email(self, email) -> dict:
"""Send request to receive code to email
"""
return self.private_request("accounts/send_verify_email/", {
"phone_id": self.phone_id,
"device_id": self.device_id,
"email": email,
"waterfall_id": self.waterfall_id,
"auto_confirm_only": "false"
})

def check_confirmation_code(self, email, code) -> dict:
"""Enter code from email
"""
return self.private_request("accounts/check_confirmation_code/", {
"code": code,
"device_id": self.device_id,
"email": email,
"waterfall_id": self.waterfall_id
})

def check_age_eligibility(self, year, month, day):
return self.private.post(
"consent/check_age_eligibility/", data={
'_csrftoken': self.token,
'day': day,
'year': year,
'month': month
}
).json()

def accounts_create(
self,
username: str,
password: str,
email: str,
signup_code: str,
full_name: str = '',
year: int = None,
month: int = None,
day: int = None,
**kwargs
) -> dict:
timestamp = datetime.now().strftime('%s')
nonce = f'{username}|{timestamp}|\xb9F"\x8c\xa2I\xaaz|\xf6xz\x86\x92\x91Y\xa5\xaa#f*o%\x7f'
sn_nonce = base64.encodebytes(nonce.encode()).decode().strip()
data = {
"is_secondary_account_creation": "true",
"jazoest": str(int(random.randint(22300, 22399))), # "22341",
"tos_version": "row",
"suggestedUsername": "sn_result",
"do_not_auto_login_if_credentials_match": "false",
"phone_id": self.phone_id,
"enc_password": self.password_encrypt(password),
"username": str(username),
"first_name": str(full_name),
"day": str(day),
"adid": self.adid,
"guid": self.uuid,
"year": str(year),
"device_id": self.device_id,
"_uuid": self.uuid,
"email": email,
"month": str(month),
"sn_nonce": sn_nonce,
"force_sign_up_code": signup_code,
"waterfall_id": self.waterfall_id,
"password": password,
"one_tap_opt_in": "true",
**kwargs
}
return self.private_request("accounts/create/", data)

def challenge_flow(self, data):
data = self.challenge_api(data)
while True:
if data.get('message') == 'challenge_required':
data = self.challenge_captcha(data['challenge'])
continue
elif data.get('challengeType') == 'SubmitPhoneNumberForm':
data = self.challenge_submit_phone_number(data)
continue
elif data.get('challengeType') == 'VerifySMSCodeFormForSMSCaptcha':
data = self.challenge_verify_sms_captcha(data)
continue

def challenge_api(self, data):
resp = self.private.get(
f"https://i.instagram.com/api/v1{data['api_path']}",
params={
"guid": self.uuid,
"device_id": self.device_id,
"challenge_context": data['challenge_context']
}
)
return resp.json()

def challenge_captcha(self, data):
g_recaptcha_response = self.captcha_resolve()
resp = self.private.post(
f"https://i.instagram.com{data['api_path']}",
data={'g-recaptcha-response': g_recaptcha_response}
)
return resp.json()

def challenge_submit_phone_number(self, data, phone_number):
api_path = data.get('navigation', {}).get('forward')
resp = self.private.post(
f"https://i.instagram.com{api_path}",
data={
"phone_number": phone_number,
"challenge_context": data['challenge_context']
}
)
return resp.json()

def challenge_verify_sms_captcha(self, data, security_code):
api_path = data.get('navigation', {}).get('forward')
resp = self.private.post(
f"https://i.instagram.com{api_path}",
data={
"security_code": security_code,
"challenge_context": data['challenge_context']
}
)
return resp.json()
1 change: 0 additions & 1 deletion instagrapi/story.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
except ImportError:
raise Exception("You don't have PIL installed. Please install PIL or Pillow>=8.1.1")


class StoryBuilder:
"""
Helpers for Story building
Expand Down
34 changes: 34 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathlib import Path

from instagrapi import Client
from instagrapi.utils import gen_password
from instagrapi.exceptions import (
BadCredentials,
DirectThreadNotFound,
Expand Down Expand Up @@ -1242,6 +1243,39 @@ def test_location_medias_recent(self):
self.assertIsInstance(medias[0], Media)


class SignUpTestCase(unittest.TestCase):

def test_password_enrypt(self):
cl = Client()
enc_password = cl.password_encrypt('test')
parts = enc_password.split(':')
self.assertEqual(parts[0], '#PWD_INSTAGRAM')
self.assertEqual(parts[1], '4')
self.assertTrue(int(parts[2]) > 1607612345)
self.assertTrue(len(parts[3]) == 392)

def test_signup(self):
cl = Client()
username = gen_password()
password = gen_password(12, symbols=True)
email = f'{username}@gmail.com'
phone_number = os.environ.get("IG_PHONE_NUMBER")
full_name = f'John {username}'
user = cl.signup(
username, password, email, phone_number, full_name,
year=random.randint(1980, 1990),
month=random.randint(1, 12),
day=random.randint(1, 30)
)
self.assertIsInstance(user, UserShort)
for key, val in {
"username": username,
"full_name": full_name
}.items():
self.assertEqual(getattr(user, key), val)
self.assertTrue(user.profile_pic_url.startswith("https://"))


class ClientHashtagTestCase(ClientPrivateTestCase):
REQUIRED_MEDIA_FIELDS = [
"pk",
Expand Down

0 comments on commit 4b40759

Please sign in to comment.