Auto-update: Wed Jun 26 10:08:32 PDT 2024

This commit is contained in:
sanj 2024-06-26 10:08:32 -07:00
parent b6ae0a8e31
commit a0fc2c6d77
2 changed files with 162 additions and 33 deletions

37
ig.py
View file

@ -94,10 +94,10 @@ args = parser.parse_args()
### FOLDER & PATH SETUP ###
###########################
PROFILE = args.profile
VISION_DIR = "/Users/sij/AI/Vision" # CHANGE ME!
SD_DIR = "/Users/sij/workshop/sd" # CHANGE ME!
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
WORKFLOWS_DIR = os.path.join(VISION_DIR, 'workflows')
RESULT_DIRECTORY = os.path.join(VISION_DIR, 'ComfyUI/output/')
WORKFLOWS_DIR = os.path.join(SD_DIR, 'workflows')
RESULT_DIRECTORY = os.path.join(SD_DIR, 'ComfyUI/output/')
PROFILES_DIR = os.path.join(BASE_DIR, 'profiles')
PROFILE_DIR = os.path.join(PROFILES_DIR, PROFILE)
PROFILE_IMAGES_DIR = os.path.join(PROFILE_DIR, 'images')
@ -112,15 +112,6 @@ if not os.path.exists(PROFILE_IMAGES_DIR):
OPENAI_API_KEY=PROFILE_CONFIG.get("openai_key")
def start_file_server():
command = [
"caddy", "file-server",
"--root", PROFILE_IMAGES_DIR,
"--listen", ":8190",
"--browse"
]
subprocess.Popen(command)
atexit.register(lambda: subprocess.call(["killall", "caddy"]))
###################
### VALIDATION ###
@ -154,9 +145,6 @@ if args.local is False and args.dalle is True:
IMG_GEN = OpenAI(api_key=OPENAI_API_KEY)
IMG_MODEL = "dall-e-3"
# else:
# start_file_server()
# IMG_GEN = SD(PROFILE_IMAGES_DIR, "")
SD_SERVER_ADDRESS = "http://localhost:8188"
CLIENT_ID = str(uuid.uuid4())
@ -608,7 +596,7 @@ def query_ollama(llmPrompt: List = [], system_msg: str = "", user_msg: str = "",
messages = llmPrompt if llmPrompt else [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg}]
response = LLM.chat(model="mixtral", messages=messages, options={"num_predict": max_tokens})
response = LLM.chat(model="mistral", messages=messages, options={"num_predict": max_tokens})
logger.debug(response)
if "message" in response:
@ -892,23 +880,6 @@ def poll_status(prompt_id):
return job_data
time.sleep(1)
def poll_status(prompt_id):
"""Poll the job status until it's complete and return the status data."""
start_time = time.time() # Record the start time
while True:
elapsed_time = int(time.time() - start_time) # Calculate elapsed time in seconds
status_response = requests.get(f"{SD_SERVER_ADDRESS}/history/{prompt_id}")
# Use \r to return to the start of the line, and end='' to prevent newline
print(f"\rGenerating {prompt_id}. Elapsed time: {elapsed_time} seconds", end='')
if status_response.status_code != 200:
raise Exception("Failed to get job status")
status_data = status_response.json()
job_data = status_data.get(prompt_id, {})
if job_data.get("status", {}).get("completed", False):
print()
print(f"{prompt_id} completed in {elapsed_time} seconds.")
return job_data
time.sleep(1)
################################
### PRIMARY ACTIVE FUNCTIONS ###

158
ig_messages.py Normal file
View file

@ -0,0 +1,158 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
import time
import pytesseract
from PIL import Image
import yaml
import logging as log
log.basicConfig(
filename="log.txt",
filemode="a",
level=log.INFO,
format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
datefmt="%Y-%m-%d %H:%M:%S"
)
MAX_TIMEOUT = 7
def driver_init():
mobile_emulation = {
"deviceMetrics": { "width": 768, "height": 1024, "pixelRatio": 2 },
"userAgent": "Mozilla/5.0 (iPad; CPU OS 11_0 like Mac OS X) AppleWebKit/604.1.34 (KHTML, like Gecko) Version/11.0 Mobile/15A5341f Safari/604.1"
}
options = webdriver.ChromeOptions()
options.add_experimental_option("mobileEmulation", mobile_emulation)
# options.add_argument("--headless") # Uncomment to run headless
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
return driver
def read_files():
try:
with open("user_config.yaml", "r") as user_stream:
user_config = yaml.safe_load(user_stream)
with open("message_config.yaml", "r") as message_stream:
message_config = yaml.safe_load(message_stream)
# Example of validating expected keys:
if "username" not in user_config or "password" not in user_config:
log.error("User configuration file is missing username or password")
exit(1)
if "username" not in message_config or "message" not in message_config:
log.error("Message configuration file is missing target username or message")
exit(1)
return user_config, message_config
except FileNotFoundError as e:
log.error(f"Configuration file not found: {e}")
exit(1)
except yaml.YAMLError as exc:
log.error(f"Error parsing YAML file: {exc}")
exit(1)
def init(driver):
driver.get("https://www.instagram.com")
log.info("Directed to instagram.com")
def login(driver, user_config):
username = user_config["username"]
password = user_config["password"]
username_input = WebDriverWait(driver, MAX_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@name='username']")))
username_input.send_keys(username)
password_input = WebDriverWait(driver, MAX_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//input[@name='password']")))
password_input.send_keys(password)
login_button = WebDriverWait(driver, MAX_TIMEOUT).until(EC.presence_of_element_located((By.XPATH, "//button[@type='submit']//div[text()='Log in']")))
login_button.click()
log.info("Login info entered, login button clicked")
def accept_cookies(driver):
try:
cookie_button = WebDriverWait(driver, MAX_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[text()='Allow all cookies']")))
cookie_button.click()
log.info("Cookies accepted")
WebDriverWait(driver, MAX_TIMEOUT).until(EC.invisibility_of_element_located((By.XPATH, "//button[text()='Allow all cookies']")))
except NoSuchElementException as exc:
log.warn("Accept cookies not needed")
def redirect_to_user_profile(driver, message_config):
driver.get("https://www.instagram.com/" + message_config["username"])
log.info("Redirected to user profile (instagram.com/" + message_config["username"] + ")")
def click_on_message(driver):
try:
message_button = WebDriverWait(driver, MAX_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@role='button' and text()='Message']")))
message_button.click()
log.info("Clicked on \"Message\" button")
except NoSuchElementException as exc:
log.error("You don't have the necessary rights to message this user or there is no user with this username.")
def perform_ocr(image_path):
image = Image.open(image_path)
text = pytesseract.image_to_string(image)
return text
def click_on_message_and_capture(driver):
try:
message_button = WebDriverWait(driver, MAX_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@role='button' and text()='Message']")))
message_button.click()
log.info("Clicked on 'Message' button")
time.sleep(5) # Wait for the message page to fully load or animations to complete
driver.save_screenshot("message_page.png")
log.info("Screenshot of the message page taken")
except NoSuchElementException as exc:
log.error("Message button not found or unable to capture the screen")
def not_save_info(driver):
not_save_login_info = WebDriverWait(driver, MAX_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//div[@role='button' and text()='Not Now']")))
not_save_login_info.click()
log.info("Saving info declined")
def decline_notifications(driver):
try:
notifications = WebDriverWait(driver, MAX_TIMEOUT).until(EC.element_to_be_clickable((By.XPATH, "//button[text()='Not Now']")))
notifications.click()
log.info("Declined notifications")
except NoSuchElementException as exc:
log.warn("Declining notifications not needed")
def send_message(driver, message_config):
send_message_textbox = WebDriverWait(driver, MAX_TIMEOUT).until(EC.presence_of_element_located((By.XPATH, "//div[@role='textbox' and @aria-label='Message']")))
send_message_textbox.send_keys(message_config["message"])
WebDriverWait(driver, MAX_TIMEOUT).until(EC.presence_of_element_located((By.XPATH, "//div[@role='button' and text()='Send']")))
driver.find_element(By.XPATH, "//div[@role='button' and text()='Send']").click()
log.info("Message \"" + message_config["message"] + "\" sent successfully to user " + message_config["username"] + "\n\n")
if __name__ == "__main__":
driver = driver_init()
user_config, message_config = read_files()
init(driver)
accept_cookies(driver)
login(driver, user_config)
not_save_info(driver)
decline_notifications(driver)
redirect_to_user_profile(driver, message_config)
click_on_message_and_capture(driver)
ocr_text = perform_ocr("message_page.png")
print(ocr_text)
driver.close()