ig_bot/unsorted/ig-m.py

1276 lines
47 KiB
Python
Raw Normal View History

2024-05-25 09:13:33 +02:00
#!/Users/sij/miniforge3/envs/instabot/bin/python
import os
import io
from io import BytesIO
import copy
import re
import jwt
import json
from tqdm import tqdm
import pyotp
import time
import pytz
import requests
import tempfile
import random
import subprocess
import atexit
import urllib.parse
import urllib.request
import uuid
import base64
from time import sleep
from datetime import timedelta, datetime as date
from PIL import Image
from typing import Dict, List
from instagrapi import Client as igClient
from instagrapi.types import UserShort
from urllib.parse import urlparse
from instagrapi.exceptions import LoginRequired as ClientLoginRequiredError
from openai import OpenAI
import argparse
from PIL import Image
import io
import json
from ollama import Client as oLlama
from SD import SD
from dotenv import load_dotenv
###########################
### .env INITIALIZATION ###
###########################
load_dotenv()
GHOST_API_KEY=os.getenv('GHOST_API_KEY')
GHOST_CONTENT_KEY=os.getenv('GHOST_CONTENT_KEY')
OPENAI_API_KEY=os.getenv('OPENAI_API_KEY')
GHOST_API_URL=os.getenv('GHOST_API_URL')
########################
### ARGUMENT PARSING ###
########################
parser = argparse.ArgumentParser(description='Instagram bot for generating images and engaging with other users.')
parser.add_argument('--account', type=str, help='Select which Instagram account to act as.', default='simulacrai')
parser.add_argument('--newsession', action='store_true', help='Start a new session.', default=False)
parser.add_argument('--shortsleep', type=int, help='Average short sleep time in seconds.', default=5)
parser.add_argument('--longsleep', type=int, help='Average long sleep time in seconds.', default=180)
parser.add_argument('--user', type=str, help='Instagram username to fetch media from and comment on.', default='')
parser.add_argument('--hashtag', type=str, help='Hashtag to fetch media from and comment on.', default='')
parser.add_argument('--image_url', type=str, help='URL to a specific Instagram image to comment on.', default='')
parser.add_argument('--count', type=int, help='Number of media items to interact with.', default=1)
parser.add_argument('--commenttype', type=str, help='Type of comments to make.', default='')
parser.add_argument('--commentmode', type=str, help='Mode of commenting.', default='recent')
parser.add_argument('--posttype', type=str, help='Generate images of a specific type.', default=None)
parser.add_argument('--postfile', type=str, help='Generate post from an existing image file.', default=None)
parser.add_argument('--commentonly', action='store_true', help='Only comment on media, do not generate images.', default=False)
parser.add_argument('--postonly', action='store_true', help='Only generate images and post to Instagram, do not comment.', default=False)
parser.add_argument('--noghost', action='store_true', help='Skips posting generated images to Ghost.', default=False)
parser.add_argument('--local', action='store_true', help='Do not post to Instagram, and disable OpenAI', default=False)
parser.add_argument('--openai', action='store_true', help='Allows the use of OpenaI for chat completions and vision', default=False)
parser.add_argument('--gpt4', action='store_true', help='Allows the use of OpenAI GPT-4 for chat completions', default=False)
parser.add_argument('--ollama', action='store_true', help='Allows the use of Ollama for chat completions', default=False)
parser.add_argument('--llava', action='store_true', help='Allows the use of Ollama Llava for image recognition', default=False)
parser.add_argument('--dalle', action='store_true', help='Allows the use of OpenAI DALL-E3 for image generation', default=False)
parser.add_argument('--wallpaper', action='store_true', help='Set the macOS wallpaper to use the generated images', default=False)
parser.add_argument('--fast', action='store_true', help='Appends _fast to workflow names, which are expedited workflows that generally forego ultimate upscaling.', default=False)
args = parser.parse_args()
###########################
### FOLDER & PATH SETUP ###
###########################
ACCOUNT = args.account
VISION_DIR = "/Users/sij/AI/Vision"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
WORKFLOWS_DIR = os.path.join(VISION_DIR, 'workflows')
RESULT_DIRECTORY = os.path.join(VISION_DIR, 'ComfyUI/output/')
ACCOUNT_DIR = os.path.join(BASE_DIR, ACCOUNT)
ACCOUNT_IMAGES_DIR = os.path.join(ACCOUNT_DIR, 'images')
ACCOUNT_CONFIG_PATH = os.path.join(ACCOUNT_DIR, f'config.json')
DOWNLOADED_IMAGES_PATH = os.path.join(ACCOUNT_DIR, 'downloads')
with open(ACCOUNT_CONFIG_PATH, 'r') as config_file:
ACCOUNT_CONFIG = json.load(config_file)
if not os.path.exists(ACCOUNT_IMAGES_DIR):
os.makedirs(ACCOUNT_IMAGES_DIR )
def start_file_server():
command = [
"caddy", "file-server",
"--root", ACCOUNT_IMAGES_DIR,
"--listen", ":8190",
"--browse"
]
subprocess.Popen(command)
atexit.register(lambda: subprocess.call(["killall", "caddy"]))
###################
### VALIDATION ###
##################
if args.account and args.posttype and not args.posttype in ACCOUNT_CONFIG["posts"]:
print ("ERROR: NO SUCH POST TYPE IS AVAILABLE FOR THIS ACCOUNT.")
if args.account and args.commenttype and not args.commenttype in ACCOUNT_CONFIG["comments"]:
print ("ERROR: NO SUCH COMMENTTYPE IS AVAILABLE FOR THIS ACCOUNT.")
####################
### CLIENT SETUP ###
####################
cl = igClient(request_timeout=1)
if args.local is False and args.ollama is False and args.openai is True:
LLM = OpenAI(api_key=OPENAI_API_KEY)
else:
LLM = oLlama(host='http://localhost:11434')
if args.local is False and args.openai is True:
VISION_LLM = OpenAI(api_key=OPENAI_API_KEY)
else:
VISION_LLM = oLlama(host='http://localhost:11434')
if args.local is False and args.dalle is True:
IMG_GEN = OpenAI(api_key=OPENAI_API_KEY)
IMG_MODEL = "dall-e-3"
# else:
# start_file_server()
# IMG_GEN = SD(ACCOUNT_IMAGES_DIR, "")
SD_SERVER_ADDRESS = "http://localhost:8188"
CLIENT_ID = str(uuid.uuid4())
###############################
### INSTAGRAM & GHOST SETUP ###
###############################
IG_USERNAME = ACCOUNT_CONFIG.get("ig_name")
IG_PASSWORD = ACCOUNT_CONFIG.get("ig_pass")
IG_SECRET_KEY = ACCOUNT_CONFIG.get("ig_2fa_secret")
IG_SESSION_PATH = os.path.join(ACCOUNT_DIR, f'credentials.json')
########################
### LLM PROMPT SETUP ###
########################
IMG_PROMPT_SYS = ACCOUNT_CONFIG.get("img_prompt_sys")
IMG_DESCRIPTION_SYS = ACCOUNT_CONFIG.get("img_description_sys")
COMMENT_PROMPT_SYS = ACCOUNT_CONFIG.get("img_comment_sys")
HASHTAGS = ACCOUNT_CONFIG.get("preferred_hashtags", [])
IMAGE_URL = args.image_url
rollover_time = 1702605780
COMPLETED_MEDIA_LOG = os.path.join(ACCOUNT_DIR, f'completed-media.txt')
TOTP = pyotp.TOTP(IG_SECRET_KEY)
SHORT = args.shortsleep
LONG = args.longsleep
#######################
### ⚠️ FUNCTIONS ⚠️ ###
#######################
########################
### SOCIAL FUNCTIONS ###
########################
def follow_by_username(username) -> bool:
"""
Follow a user
Parameters
----------
username: str
Username for an Instagram account
Returns
-------
bool
A boolean value
"""
userid = cl.user_id_from_username(username)
sleep(SHORT)
return cl.user_follow(userid)
def unfollow_by_username(username) -> bool:
"""
Unfollow a user
Parameters
----------
username: str
Username for an Instagram account
Returns
-------
bool
A boolean value
"""
userid = cl.user_id_from_username(username)
sleep(SHORT)
return cl.user_unfollow(userid)
def get_followers(amount: int = 0) -> Dict[int, UserShort]:
"""
Get bot's followers
Parameters
----------
amount: int, optional
Maximum number of media to return, default is 0 - Inf
Returns
-------
Dict[int, UserShort]
Dict of user_id and User object
"""
sleep(SHORT)
return cl.user_followers(cl.user_id, amount=amount)
def get_followers_usernames(amount: int = 0) -> List[str]:
"""
Get bot's followers usernames
Parameters
----------
amount: int, optional
Maximum number of media to return, default is 0 - Inf
Returns
-------
List[str]
List of usernames
"""
followers = cl.user_followers(cl.user_id, amount=amount)
sleep(SHORT)
return [user.username for user in followers.values()]
def get_following(amount: int = 0) -> Dict[int, UserShort]:
"""
Get bot's followed users
Parameters
----------
amount: int, optional
Maximum number of media to return, default is 0 - Inf
Returns
-------
Dict[int, UserShort]
Dict of user_id and User object
"""
sleep(SHORT)
return cl.user_following(cl.user_id, amount=amount)
def get_user_media(username, amount=30):
"""
Fetch recent media for a given username.
Parameters
----------
username: str
Username for an Instagram account
amount: int, optional
Number of media to fetch, default is 20
Returns
-------
List of medias
"""
print(f"Fetching recent media for {username}...")
user_id = cl.user_id_from_username(username)
medias = cl.user_medias(user_id, amount)
final_medias = []
for media in medias:
sleep(SHORT)
if media.media_type == 1:
final_medias.append(media)
return final_medias
def get_user_image_urls(username, amount=30) -> List[str]:
"""
Fetch recent media URLs for a given username.
Parameters
----------
username: str
Username for an Instagram account
amount: int, optional
Number of media URLs to fetch, default is 20
Returns
-------
List[str]
List of media URLs
"""
print(f"Fetching recent media URLs for {username}...")
user_id = cl.user_id_from_username(username)
medias = cl.user_medias(user_id, amount)
urls = []
for media in medias:
sleep(SHORT)
if media.media_type == 1 and media.thumbnail_url: # Photo
urls.append(media.thumbnail_url)
# elif media.media_type == 2: # Video
# urls.append(media.video_url)
# elif media.media_type == 8: # Album
# for resource in media.resources:
# sleep(SHORT)
# if resource.media_type == 1:
# urls.append(resource.thumbnail_url)
# elif resource.media_type == 2:
# urls.append(resource.video_url)
return urls
def is_valid_url(url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def get_random_follower():
followers = cl.get_followers_usernames()
sleep(SHORT)
return random.choice(followers)
def get_medias_by_hashtag(hashtag: str, days_ago_max:int = 14, ht_type:str = None, amount:int = args.count):
if not ht_type:
ht_type = args.commentmode
print(f"Fetching {ht_type} media for hashtag: {hashtag}")
ht_medias = []
while True:
sleep(SHORT)
if ht_type == "top":
ht_medias.extend(cl.hashtag_medias_top(name=hashtag, amount=amount*10))
elif ht_type == "recent":
ht_medias.extend(cl.hashtag_medias_recent(name=hashtag, amount=amount*10))
filtered_medias = filter_medias(ht_medias, days_ago_max=days_ago_max)
print(f"Filtered {ht_type} media count obtained for '#{hashtag}': {len(filtered_medias)}")
if len(filtered_medias) >= amount:
print(f"Desired amount of {amount} filtered media reached.")
break
return filtered_medias
def get_medias_from_all_hashtags(days_ago_max=14, ht_type:str = None, amount:int = args.count):
if not ht_type:
ht_type = args.commentmode
print(f"Fetching {ht_type} media.")
filtered_medias = []
while len(filtered_medias) < amount:
hashtag = random.choice(HASHTAGS)
print(f"Using hashtag: {hashtag}")
fetched_medias = []
sleep(SHORT)
# Fetch medias based on the hashtag type
if ht_type == "top":
fetched_medias = cl.hashtag_medias_top(name=hashtag, amount=50) # Fetch a large batch to filter from
elif ht_type == "recent":
fetched_medias = cl.hashtag_medias_recent(name=hashtag, amount=50) # Same for recent
current_filtered_medias = filter_medias(fetched_medias, days_ago_max=days_ago_max)
filtered_medias.extend(current_filtered_medias)
print(f"Filtered {ht_type} media count obtained for '#{hashtag}': {len(current_filtered_medias)}")
# Trim the list if we've collected more than needed
if len(filtered_medias) > amount:
filtered_medias = filtered_medias[:amount]
print(f"Desired amount of {amount} filtered media reached.")
break
else:
print(f"Total filtered media count so far: {len(filtered_medias)}")
return filtered_medias
def filter_medias(
medias: List,
like_count_min=None,
like_count_max=None,
comment_count_min=None,
comment_count_max=None,
days_ago_max=None,
):
# Adjust to use your preferred timezone, for example, UTC
days_back = date.now(pytz.utc) - timedelta(days=days_ago_max) if days_ago_max else None
return [
media for media in medias
if (
(like_count_min is None or media.like_count >= like_count_min) and
(like_count_max is None or media.like_count <= like_count_max) and
(comment_count_min is None or media.comment_count >= comment_count_min) and
(comment_count_max is None or media.comment_count <= comment_count_max) and
(days_ago_max is None or (media.taken_at and media.taken_at > days_back)) and not
check_media_in_completed_lists(media)
)
]
def add_media_to_completed_lists(media):
"""
Add a media to the completed lists after interacting with it.
"""
with open(COMPLETED_MEDIA_LOG, 'a') as file:
file.write(f"{str(media.pk)}\n")
def check_media_in_completed_lists(media):
"""
Check if a media is in the completed lists.
"""
with open(COMPLETED_MEDIA_LOG, 'r') as file:
completed_media = file.read().splitlines()
return str(media.pk) in completed_media
def download_and_resize_image(url: str, download_path: str = None, max_dimension: int = 1200) -> str:
if not isinstance(url, str):
url = str(url)
parsed_url = urlparse(url)
if not download_path or not os.path.isdir(os.path.dirname(download_path)):
# Use a temporary file if download_path is not specified or invalid
_, temp_file_extension = os.path.splitext(parsed_url.path)
if not temp_file_extension:
temp_file_extension = ".jpg" # Default extension if none is found
download_path = tempfile.mktemp(suffix=temp_file_extension, prefix="download_")
if url and parsed_url.scheme and parsed_url.netloc:
try:
os.makedirs(os.path.dirname(download_path), exist_ok=True)
with requests.get(url) as response:
response.raise_for_status() # Raises an HTTPError if the response was an error
image = Image.open(BytesIO(response.content))
# Resize the image, preserving aspect ratio
if max(image.size) > max_dimension:
image.thumbnail((max_dimension, max_dimension))
# Save the image, preserving the original format if possible
image_format = image.format if image.format else "JPEG"
image.save(download_path, image_format)
return download_path
except Exception as e:
# Handle or log the error as needed
print(f"Error downloading or resizing image: {e}")
return None
###############################
### ACTIVE SOCIAL FUNCTIONS ###
###############################
def comment_on_user_media(user: str, comment_type: str = "default", amount=5):
"""
Comment on a user's media.
"""
comment_prompt_usr = ACCOUNT_CONFIG['comments'][comment_type]['img_comment_usr']
medias = get_user_media(user, amount)
for media in medias:
if not check_media_in_completed_lists(media):
sleep(SHORT)
if media.thumbnail_url and is_valid_url(media.thumbnail_url):
media_path = download_and_resize_image(media.thumbnail_url, f"{DOWNLOADED_IMAGES_PATH}/{media.pk}.jpg")
if media_path is not None:
encoded_media = encode_image_to_base64(media_path)
comment_text = llava(encoded_media, COMMENT_PROMPT_SYS, comment_prompt_usr) if args.llava or not args.openai else gpt4v(encoded_media, COMMENT_PROMPT_SYS, comment_prompt_usr)
if comment_text:
cl.media_comment(media.pk, comment_text)
print(f"Commented on media: {media.pk}")
else:
print(f"Failed to generate comment for media: {media.pk}")
add_media_to_completed_lists(media)
sleep(SHORT)
else:
print(f"We received a nonetype! {media_path}")
else:
print(f"URL for {media.pk} disappeared it seems...")
else:
print(f"Media already interacted with: {media.pk}")
def comment_by_type(comment_type: str = args.commenttype, amount=3, hashtag: str = None):
"""
Comment on a hashtag's media.
"""
if not hashtag:
hashtag = random.choice(ACCOUNT_CONFIG['comments'][comment_type]['hashtags'])
medias = get_medias_by_hashtag(hashtag=hashtag, days_ago_max=7, amount=amount)
for media in medias:
if not check_media_in_completed_lists(media):
media_path = download_and_resize_image(media.thumbnail_url, f"{DOWNLOADED_IMAGES_PATH}/{media.pk}.jpg")
comment_text = None
if media_path and os.path.exists(media_path):
encoded_media = encode_image_to_base64(media_path)
comment_prompt_usr = ACCOUNT_CONFIG['comments'][comment_type]['img_comment_usr'] + " For reference, here is the description that was posted with this image: " + media.caption_text
comment_text = llava(encoded_media, comment_prompt_usr) if args.llava or not args.openai else gpt4v(encoded_media, COMMENT_PROMPT_SYS, comment_prompt_usr)
if comment_text:
cl.media_comment(media.pk, comment_text)
print(f"Commented on media: https://instagram.com/p/{media.pk}/")
else:
print(f"Failed to generate comment for media: https://instagram.com/p/{media.pk}")
add_media_to_completed_lists(media)
sleep(SHORT)
else:
print(f"Media already interacted with: {media.pk}")
def comment_on_specific_media(media_url, comment_type: str = "default"):
"""
Comment on a specific media given its URL.
"""
media_id = cl.media_pk_from_url(media_url)
sleep(SHORT)
media = cl.media_info(media_id)
sleep(SHORT)
media_path = download_and_resize_image(media.thumbnail_url, f"{DOWNLOADED_IMAGES_PATH}/{media.pk}.jpg")
encoded_media = encode_image_to_base64(media_path)
comment_prompt_usr = ACCOUNT_CONFIG['comments'][comment_type]['img_comment_usr'] + " For reference, here is the description that was posted with this image: " + media.caption_text
comment_text = llava(encoded_media, comment_prompt_usr) if args.llava or not args.openai else gpt4v(encoded_media, COMMENT_PROMPT_SYS, comment_prompt_usr)
if comment_text:
cl.media_comment(media.pk, comment_text)
print(f"Commented on specific media: https://instagram.com/p/{media.pk}/")
else:
print(f"Failed to generate comment for specific media: https://instagram.com/p/{media.pk}/")
#####################
### LLM FUNCTIONS ###
#####################
def query_llm(llmPrompt: List = [], system_msg: str = "", user_msg: str = "", max_tokens: int = 150):
messages = llmPrompt if llmPrompt else [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg}
]
# options_dict = {"num_predict": max_tokens}
response = LLM.chat.create(
model="gpt-4",
messages=messages,
max_tokens=max_tokens
) if args.openai else LLM.chat(
model="mixtral",
messages=messages, # Assuming `messages` is defined somewhere
options={"num_predict": max_tokens}
)
print(response)
if "choices" in response:
first_choice = response.choices[0]
if first_choice.message.content:
return first_choice.message.content
else:
message_str = str(first_choice.message)
match = re.search(r"content='(.*?)', role=", message_str, re.DOTALL)
if match:
return match.group(1)
else:
print("No content found in message string:", message_str)
print("Trying again!")
query_llm(system_msg, user_msg, max_tokens)
elif "message" in response:
if "content" in response["message"]:
content = response["message"]["content"]
print(f"Response: {content}")
return content
else:
print("No choices found in response")
return ""
def llava(image_base64, prompt):
response = VISION_LLM.generate(
model = 'llava',
prompt = f"This is a chat between a user and an assistant. The assistant is helping the user to describe an image. {prompt}",
images = [image_base64]
)
print(response)
return "" if "pass" in response["response"].lower() else response["response"]
def gpt4v(image_base64, prompt_sys: str, prompt_usr: str, max_tokens: int = 150):
response_1 = VISION_LLM.chat.completions.create(
model="gpt-4-vision-preview",
messages=[
{
"role": "system",
"content": f"This is a chat between a user and an assistant. The assistant is helping the user to describe an image. {prompt_sys}",
},
{
"role": "user",
"content": [
{"type": "text", "text": f"{prompt_usr}"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
},
},
],
}
],
max_tokens=max_tokens,
stream=False
)
if response_1 and response_1.choices:
if len(response_1.choices) > 0:
first_choice = response_1.choices[0]
if first_choice.message and first_choice.message.content:
comment_content = first_choice.message.content
if "PASS" in comment_content:
return ""
print(f"Generated comment: {comment_content}")
response_2 = VISION_LLM.chat.completions.create(
model="gpt-4-vision-preview",
messages=[
{
"role": "system",
"content": f"This is a chat between a user and an assistant. The assistant is helping the user to describe an image. {prompt_sys}",
},
{
"role": "user",
"content": [
{"type": "text", "text": f"{prompt_usr}"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
},
},
],
},
{
"role": "assistant",
"content": comment_content
},
{
"role": "user",
"content": "Please refine it, and remember to ONLY include the caption or comment, nothing else! That means no preface, no postscript, no notes, no reflections, and not even any acknowledgment of this follow-up message. I need to be able to use your output directly on social media. Do include emojis though."
}
],
max_tokens=max_tokens,
stream=False
)
if response_2 and response_2.choices:
if len(response_2.choices) > 0:
first_choice = response_2.choices[0]
if first_choice.message and first_choice.message.content:
final_content = first_choice.message.content
print(f"Generated comment: {final_content}")
if "PASS" in final_content:
return ""
else:
return final_content
print("Vision response did not contain expected data.")
print(f"Vision response: {response_1}")
sleep(15)
try_again = gpt4v(image_base64, prompt_sys, prompt_usr, max_tokens)
return try_again
def encode_image_to_base64(image_path):
if os.path.exists(image_path):
with Image.open(image_path) as image:
output_buffer = BytesIO()
image.save(output_buffer, format='JPEG')
byte_data = output_buffer.getvalue()
base64_str = base64.b64encode(byte_data).decode('utf-8')
return base64_str
else:
print(f"Error: File does not exist at {image_path}")
def get_image(status_data, key):
"""Extract the filename and subfolder from the status data and read the file."""
try:
outputs = status_data.get("outputs", {})
images_info = outputs.get(key, {}).get("images", [])
if not images_info:
raise Exception("No images found in the job output.")
image_info = images_info[0] # Assuming the first image is the target
filename = image_info.get("filename")
subfolder = image_info.get("subfolder", "") # Default to empty if not present
file_path = os.path.join(RESULT_DIRECTORY, subfolder, filename)
with open(file_path, 'rb') as file:
return file.read()
except KeyError as e:
raise Exception(f"Failed to extract image information due to missing key: {e}")
except FileNotFoundError:
raise Exception(f"File {filename} not found at the expected path {file_path}")
def update_prompt(workflow: dict, post: dict, positive: str, found_key=[None], path=None):
if path is None:
path = []
try:
if isinstance(workflow, dict):
for key, value in workflow.items():
current_path = path + [key]
if isinstance(value, dict):
if value.get('class_type') == 'SaveImage' and value.get('inputs', {}).get('filename_prefix') == 'API_':
found_key[0] = key
update_prompt(value, post, positive, found_key, current_path)
elif isinstance(value, list):
# Recursive call with updated path for each item in a list
for index, item in enumerate(value):
update_prompt(item, post, positive, found_key, current_path + [str(index)])
if value == "API_PPrompt":
workflow[key] = post.get(value, "") + positive
# print(f"Updated API_PPrompt to: {workflow[key]}")
elif value == "API_SPrompt":
workflow[key] = post.get(value, "")
# print(f"Updated API_SPrompt to: {workflow[key]}")
elif value == "API_NPrompt":
workflow[key] = post.get(value, "")
# print(f"Updated API_NPrompt to: {workflow[key]}")
elif key == "seed" or key == "noise_seed":
workflow[key] = random.randint(1000000000000, 9999999999999)
# print(f"Updated seed to: {workflow[key]}")
elif (key == "width" or key == "max_width" or key == "scaled_width") and (value == 1023 or value == 1025):
workflow[key] = post.get(value, "")
elif (key == "dimension" or key == "height" or key == "max_height" or key == "scaled_height") and (value == 1023 or value == 1025):
workflow[key] = post.get(value, "")
except Exception as e:
print(f"Error in update_prompt at path {' -> '.join(path)}: {e}")
raise
return found_key[0]
##################################
### IMAGE GENERATION FUNCTIONS ###
##################################
def generate_and_post_image(chosen_post = None):
"""
The main function that orchestrates the workflow from prompt update, image generation, to Instagram posting.
"""
if chosen_post and chosen_post in ACCOUNT_CONFIG['posts']:
post = ACCOUNT_CONFIG['posts'][chosen_post]
print(f"Loaded post for {chosen_post}")
else:
print(f"Unable to load post for {chosen_post}")
chosen_post = choose_post(ACCOUNT_CONFIG['posts'])
post = ACCOUNT_CONFIG['posts'][chosen_post]
print(f"Defaulted to {chosen_post}")
image_concept = query_llm(llmPrompt = post['llmPrompt'], max_tokens = 180)
print(f"Image concept: {image_concept}")
workflow_name = random.choice(post['workflows'])
workflow_data = None
if args.fast:
workflow_data = load_json(None, f"{workflow_name}_fast")
if workflow_data is None:
workflow_data = load_json(None, workflow_name)
jpg_file_path = image_gen(workflow_data)
instagram_description = llava(system_msg = IMG_DESCRIPTION_SYS, user_msg = image_concept, max_tokens = 150)
instagram_description = re.sub(r'^["\'](.*)["\']$', r'\1', instagram_description)
description_filename = f"{prompt_id}.txt"
description_path = os.path.join(ACCOUNT_IMAGES_DIR, description_filename)
with open(description_path, "w") as desc_file:
desc_file.write(instagram_description)
upload_photo(jpg_file_path, instagram_description)
print(f"Image successfully uploaded to Instagram with description: {instagram_description}")
print(f"")
def image_gen(workflow_data: dict):
saved_file_key = update_prompt(workflow = workflow_data, post = post, positive=image_concept)
print(f"")
print(f"Saved file key: {saved_file_key}")
prompt_id = queue_prompt(workflow_data)
print(f"Prompt ID: {prompt_id}")
status_data = poll_status(prompt_id)
image_data = get_image(status_data, saved_file_key)
print(f"Image data obtained")
jpg_file_path = save_as_jpg(image_data, prompt_id, 1440, 90)
print(f"Image successfully saved to {jpg_file_path}")
return jpg_file_path
def image_gen2(prompt: str, model: str):
response = IMG_GEN.images.generate(
model=model,
prompt=prompt,
size="1024x1024",
quality="standard",
n=1,
)
image_url = response.data[0].url
image_path = download_and_resize_image(image_url)
return image_path
def queue_prompt(prompt: dict):
response = requests.post(f"{SD_SERVER_ADDRESS}/prompt", json={"prompt": prompt, "client_id": CLIENT_ID})
if response.status_code == 200:
return response.json().get('prompt_id')
else:
raise Exception(f"Failed to queue prompt. Status code: {response.status_code}, Response body: {response.text}")
def poll_status(prompt_id):
"""Poll the job status until it's complete and return the status data."""
start_time = time.time() # Record the start time
while True:
elapsed_time = int(time.time() - start_time) # Calculate elapsed time in seconds
status_response = requests.get(f"{SD_SERVER_ADDRESS}/history/{prompt_id}")
# Use \r to return to the start of the line, and end='' to prevent newline
print(f"\rGenerating {prompt_id}. Elapsed time: {elapsed_time} seconds", end='')
if status_response.status_code != 200:
raise Exception("Failed to get job status")
status_data = status_response.json()
job_data = status_data.get(prompt_id, {})
if job_data.get("status", {}).get("completed", False):
print()
print(f"{prompt_id} completed in {elapsed_time} seconds.")
return job_data
time.sleep(1)
def get_image(status_data, key):
"""Extract the filename and subfolder from the status data and read the file."""
try:
outputs = status_data.get("outputs", {})
images_info = outputs.get(key, {}).get("images", [])
if not images_info:
raise Exception("No images found in the job output.")
image_info = images_info[0] # Assuming the first image is the target
filename = image_info.get("filename")
subfolder = image_info.get("subfolder", "") # Default to empty if not present
file_path = os.path.join(RESULT_DIRECTORY, subfolder, filename)
with open(file_path, 'rb') as file:
return file.read()
except KeyError as e:
raise Exception(f"Failed to extract image information due to missing key: {e}")
except FileNotFoundError:
raise Exception(f"File {filename} not found at the expected path {file_path}")
################################
### PRIMARY ACTIVE FUNCTIONS ###
################################
def load_post(chosen_post: str = "default"):
if chosen_post in ACCOUNT_CONFIG['posts']:
post = ACCOUNT_CONFIG['posts'][chosen_post]
print(f"Loaded post for {chosen_post}")
else:
print(f"Unable to load post for {chosen_post}. Choosing a default post.")
chosen_post = choose_post(ACCOUNT_CONFIG['posts'])
post = ACCOUNT_CONFIG['posts'][chosen_post]
print(f"Defaulted to {chosen_post}")
return post
def handle_image_workflow(chosen_post=None):
"""
Orchestrates the workflow from prompt update, image generation, to either saving the image and description locally
or posting to Instagram based on the local flag.
"""
post = load_post(chosen_post)
workflow_name = random.choice(post['workflows'])
print(f"Workflow name: {workflow_name}")
print(f"Generating image concept for {chosen_post} and {workflow_name} now.")
image_concept = query_llm(llmPrompt=post['llmPrompt'], max_tokens=120)
print(f"Image concept for {chosen_post}: {image_concept}")
workflow_data = None
if args.fast:
workflow_data = load_json(None, f"{workflow_name}_fast")
if workflow_data is None:
workflow_data = load_json(None, workflow_name)
if args.dalle and not args.local:
jpg_file_path = image_gen(image_concept, "dall-e-3")
else:
saved_file_key = update_prompt(workflow=workflow_data, post=post, positive=image_concept)
print(f"Saved file key: {saved_file_key}")
prompt_id = queue_prompt(workflow_data)
print(f"Prompt ID: {prompt_id}")
status_data = poll_status(prompt_id)
image_data = get_image(status_data, saved_file_key)
jpg_file_path = save_as_jpg(image_data, prompt_id, 1440, 90)
handle_single_image(jpg_file_path, chosen_post)
def handle_single_image(jpg_file_path: str, chosen_post: str, prompt: str = None):
if not prompt:
prompt = ACCOUNT_CONFIG['posts'][chosen_post]['Vision_Prompt']
encoded_string = encode_image_to_base64(jpg_file_path)
print(f"Image successfully encoded from {jpg_file_path}")
instagram_description = llava(encoded_string, prompt) if args.llava or not args.openai else gpt4v(encoded_string, prompt, 150)
instagram_description = re.sub(r'^["\'](.*)["\']$', r'\1', instagram_description)
ghost_tags = ACCOUNT_CONFIG['posts'][chosen_post]['ghost_tags']
# Save description to file and upload or save locally
description_filename = jpg_file_path.rsplit('.', 1)[0] + ".txt"
description_path = os.path.join(ACCOUNT_IMAGES_DIR, description_filename)
with open(description_path, "w") as desc_file:
desc_file.write(instagram_description)
if not args.local:
post_url = upload_photo(jpg_file_path, instagram_description)
print(f"Image posted at {post_url}")
if not args.noghost:
title_prompt = f"Generate a short 3-5 word title for this image, which already includes the following description: {instagram_description}"
img_title = llava(encoded_string, title_prompt)
ghost_text = f"{instagram_description}\n\n<a href=\"{post_url}\">Instagram link</a>"
ghost_url = post_to_ghost(img_title, jpg_file_path, ghost_text, ghost_tags)
print(f"Ghost post: {ghost_url}")
else:
img_title = None
else:
print(f"Image and description saved locally due to args.local being True.")
if args.wallpaper:
change_wallpaper(jpg_file_path)
print(f"Wallpaper changed.")
def choose_post(posts):
total_frequency = sum(posts[post_type]['frequency'] for post_type in posts)
random_choice = random.randint(1, total_frequency)
current_sum = 0
for post_type, post_info in posts.items():
current_sum += post_info['frequency']
if random_choice <= current_sum:
return post_type
def load_json(json_payload, workflow):
if json_payload:
return json.loads(json_payload)
elif workflow:
workflow_path = os.path.join(WORKFLOWS_DIR, f"{workflow}.json" if not workflow.endswith('.json') else workflow)
with open(workflow_path, 'r') as file:
return json.load(file)
else:
raise ValueError("No valid input provided.")
##################################
### IMAGE PROCESSING FUNCTIONS ###
##################################
def resize_and_convert_image(image_path, max_size=2160, quality=80):
with Image.open(image_path) as img:
# Resize image
ratio = max_size / max(img.size)
new_size = tuple([int(x * ratio) for x in img.size])
img = img.resize(new_size, Image.Resampling.LANCZOS)
# Convert to JPEG
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='JPEG', quality=quality)
img_byte_arr = img_byte_arr.getvalue()
return img_byte_arr
def save_as_jpg(image_data, prompt_id, max_size=2160, quality=80):
filename_png = f"{prompt_id}.png"
image_path_png = os.path.join(ACCOUNT_IMAGES_DIR, filename_png)
try:
# Save the raw PNG data to a file
with open(image_path_png, 'wb') as file:
file.write(image_data)
# Open the PNG, resize it, and save it as JPEG
with Image.open(image_path_png) as img:
# Resize image if necessary
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = tuple([int(x * ratio) for x in img.size])
img = img.resize(new_size, Image.Resampling.LANCZOS)
# Prepare the path for the converted image
new_file_name = f"{prompt_id}.jpeg"
new_file_path = os.path.join(ACCOUNT_IMAGES_DIR, new_file_name)
# Convert to JPEG and save
img.convert('RGB').save(new_file_path, format='JPEG', quality=quality)
# Optionally, delete the temporary PNG file
os.remove(image_path_png)
return new_file_path
except Exception as e:
print(f"Error processing image: {e}")
return None
def upload_photo(path, caption, title: str=None):
print(f"Uploading photo from {path}...")
media = cl.photo_upload(path, caption)
post_url = f"https://www.instagram.com/p/{media.code}/"
return post_url
def format_duration(seconds):
"""Return a string representing the duration in a human-readable format."""
if seconds < 120:
return f"{int(seconds)} sec"
elif seconds < 6400:
return f"{int(seconds // 60)} min"
else:
return f"{seconds / 3600:.2f} hr"
########################
### HELPER FUNCTIONS ###
########################
import subprocess
def change_wallpaper(image_path):
command = """
osascript -e 'tell application "Finder" to set desktop picture to POSIX file "{}"'
""".format(image_path)
subprocess.run(command, shell=True)
def sleep(seconds):
"""Sleep for a random amount of time, approximately the given number of seconds."""
sleepupto(seconds*0.66, seconds*1.5)
def sleepupto(min_seconds, max_seconds=None):
interval = random.uniform(min_seconds if max_seconds is not None else 0, max_seconds if max_seconds is not None else min_seconds)
start_time = time.time()
end_time = start_time + interval
with tqdm(total=interval, desc=f"Sleeping for {format_duration(interval)}", unit=" sec", ncols=75, bar_format='{desc}: {bar} {remaining}') as pbar:
while True:
current_time = time.time()
elapsed_time = current_time - start_time
remaining_time = end_time - current_time
if elapsed_time >= interval:
break
duration = min(1, interval - elapsed_time) # Adjust sleep time to not exceed interval
time.sleep(duration)
pbar.update(duration)
# Update remaining time display
pbar.set_postfix_str(f"{format_duration(remaining_time)} remaining")
########################
### GHOST FUNCTIONS ###
########################
def generate_jwt_token():
id, secret = GHOST_API_KEY.split(':')
iat = int(date.now().timestamp())
header = {'alg': 'HS256', 'typ': 'JWT', 'kid': id}
payload = {'iat': iat, 'exp': iat + 5 * 60, 'aud': '/admin/'}
token = jwt.encode(payload, bytes.fromhex(secret), algorithm='HS256', headers=header)
return token.decode('utf-8') if isinstance(token, bytes) else token
def post_to_ghost(title, image_path, html_content, ghost_tags):
jwt_token = generate_jwt_token()
# Prepare headers for the request
ghost_headers = {'Authorization': f'Ghost {jwt_token}'} # should it be Bearer?
# Open the image file in binary mode
with open(image_path, 'rb') as f:
# Define the files dictionary correctly
files = {'file': (image_path.split('/')[-1], f, 'image/jpeg')}
# Upload the image to Ghost
image_response = requests.post(f"{GHOST_API_URL}/images/upload/", headers=ghost_headers, files=files)
image_response.raise_for_status() # Ensure the request was successful
image_url = image_response.json()['images'][0]['url'] # Extract the URL of the uploaded image
# Construct HTML content with image first, horizontal line, and then original html_content
updated_html_content = f'<img src="{image_url}" alt="Image"/><hr/> {html_content}'
# Embed the updated_html_content directly without using image cards, since the image is already included in HTML
mobiledoc = {
"version": "0.3.1",
"atoms": [],
"cards": [["html", {"cardName": "html", "html": updated_html_content}]],
"markups": [],
"sections": [[10, 0]],
}
mobiledoc = json.dumps(mobiledoc)
post_data = {
'posts': [{
'title': title,
'mobiledoc': mobiledoc,
'status': 'published',
'tags': ghost_tags # Include the tags in the post data
}]
}
post_response = requests.post(f"{GHOST_API_URL}/posts/", json=post_data, headers=ghost_headers)
post_response.raise_for_status()
post_url = post_response.json()['posts'][0]['url']
return post_url
########################################################
########################################################
########################################################
if __name__ == "__main__":
current_unix_time = int(date.now().timestamp())
if args.local:
if args.postfile:
handle_single_image(args.postfile, args.posttype)
else:
while True:
handle_image_workflow(args.posttype)
else:
time_since_rollover = current_unix_time - rollover_time
time_remaining = 30 - (time_since_rollover % 30)
if time_remaining < 4:
print("Too close to end of TOTP counter. Waiting.")
sleepupto(5, 5)
if not args.newsession and os.path.exists(IG_SESSION_PATH):
cl.load_settings(IG_SESSION_PATH)
print("Loaded past session.")
elif args.newsession and cl.login(IG_USERNAME, IG_PASSWORD, verification_code=TOTP.now()):
cl.dump_settings(IG_SESSION_PATH)
print("Logged in and saved new session.")
else:
raise Exception(f"Failed to login as {IG_USERNAME}.")
if IMAGE_URL:
comment_on_specific_media(IMAGE_URL, args.commenttype)
elif args.user:
comment_on_user_media(args.user, args.count)
else:
if args.postfile:
handle_single_image(args.postfile, args.posttype)
else:
while True:
if args.commentonly:
print(f"Generating a comment about {args.commenttype}")
comment_by_type(args.commenttype, args.count, args.hashtag)
sleep(180)
elif args.postonly:
print(f"Generating a post about {args.posttype}")
handle_image_workflow(args.posttype)
sleep(360)
else:
if random.random() < 0.75:
if args.commenttype:
print(f"Generating a comment about {args.commenttype}")
comment_by_type(args.commenttype, 1)
sleep(180)
else:
chosen_comment = random.choice(list(ACCOUNT_CONFIG["comments"].keys()))
print(f"Generating a comment about {chosen_comment}")
comment_by_type(chosen_comment)
sleep(180)
else:
if args.posttype:
print(f"Generating a post about {args.posttype}")
handle_image_workflow(args.posttype)
sleep(360)
else:
chosen_post = random.choice(list(ACCOUNT_CONFIG["posts"].keys()))
print(f"Generating a post about {chosen_post}")
handle_image_workflow(chosen_post)
sleep(360)