Module: fetch¶
The fetch module starts at your server where you're logged in and searches a hashtag. After it gets all the toots your server knows about, then it starts looking at where they came from. For each server that it finds mentioned, it calls fetch_remote(). Each time that it connects to a new server, it fetches every toot that server knows about the hashtag. Then it looks at the servers that are mentioned and adds any new ones to a list of servers to contact.
Public API¶
This code depends on Mastodon.py and uses it to connect to servers that are mentioned. If you know anything about the fediverse, you know that there's more than just Mastodon servers out there. There's Pleroma, Akkoma, and various other ActivityPub-compatible servers. Some are derived from Mastodon and implement the same APIs. Others don't. Some Mastodon servers offer public read APIs, others don't. So servers that allow public read of their APIs will send you the details on their toots. Servers that don't allow public read, or that don't implement a Mastodon-compatible timeline API will be quietly skipped.
Directory Structure¶
Fetch organizes data in a directory based on the date of the event. So if the journaldir is data, the hashtag is monsterdon, and the event_date is 2025-10-19, then the directory structure is data/2025/10/19 and all the files will have the monsterdon hashtag in their name. See the example below:
data
├── 2025
│ └── 10
│ ├── 19
│ │ ├── data-monsterdon-analysis.json
│ │ ├── data-monsterdon-fetch.json
│ │ ├── index.md
│ │ ├── monsterdon-20251019.png
│ │ ├── monsterdon-20251019.txt
│ │ ├── monsterdon-beige.party.json
│ │ ├── monsterdon-bolha.us.json
│ │ ... lots more files, one per server...
│ │ ├── wordcloud-monsterdon-20251019-remove.png
│ │ └── wordcloud-monsterdon-20251019-remove.txt
List of files¶
Every file has the hashtag and the date in its name. If you ran the same analysis on 2 different hashtags on the same day, none of the files would conflict, though they would all be stored in the same directory.
data-monsterdon-analysis.json: Analysis of the results. It contains the contents of all the top posts and a bunch of meta statistics like top poster, busiest server, etc.data-monsterdon-fetch.json: Data about the fetch. Mainly the date it was done, the servers that succeeded and failed, and the gross total (not de-duplicated) of posts we fetched.index.md: The blog post entry. It's copied manually to the blog post directory.monsterdon-20251019.png: The histogram graph of activity generated by graphmonsterdon-20251019.txt: The alt text for the histogram graph generated by graphmonsterdon-[servername].json: The raw content of posts downloaded from serverservername.wordcloud-monsterdon-20251019-remove.png: The wordcloud generated by graphwordcloud-monsterdon-20251019-remove.txt: The alt text for the wordcloud generated by graph
Module for fetching toots for a hashtag.
TimestampEncoder
¶
Bases: JSONEncoder
A convenience function that converts Pandas Timestamp objects to an ISO string
Source code in mastoscore/fetch.py
class TimestampEncoder(JSONEncoder):
""" A convenience function that converts Pandas Timestamp objects to an ISO string """
def default(self, o):
if isinstance(o, Timestamp):
return o.isoformat()
if isinstance(o, pd.api.typing.NaTType):
return "null"
return super().default(o)
check_journaldir(dir_name)
¶
Check if a directory exists and create it if it doesn't.
Parameters¶
- directory (str): The name of the directory to check/create.
Returns:¶
bool: True if the directory already existed or was created, False means we tried to create it and failed.
Source code in mastoscore/fetch.py
def check_journaldir(dir_name: str) -> bool:
"""
Check if a directory exists and create it if it doesn't.
# Parameters
- directory (str): The name of the directory to check/create.
# Returns:
bool: True if the directory already existed or was created, False means we tried
to create it and failed.
"""
global logger
journaldir = abspath(dir_name)
if exists(journaldir):
if isdir(journaldir):
if access(journaldir, W_OK):
return True
else:
logger.critical(f"'{journaldir}' directory exists but is not writeable")
return False
else:
logger.critical(f"Something already exists at '{journaldir}' but it is not a directory")
return False
else:
try:
makedirs(journaldir)
logger.warning(f"Created '{journaldir}' successfully.")
return True
except OSError as e:
logger.critical(f"Error creating directory '{journaldir}': {e}")
return False
except Exception as e:
logger.critical(f"Error creating directory '{journaldir}': {e}")
return False
create_journal_directory(config)
¶
Create a hierarchical directory structure for journal files.
Parameters¶
- base_dir: Base directory for journal files: typically the hashtag
- year: Year as string (YYYY)
- month: Month as string (MM)
- day: Day as string (DD)
Returns¶
Full path to the created directory, or None if creation failed
Source code in mastoscore/fetch.py
def create_journal_directory(config:ConfigParser) -> str | None:
"""
Create a hierarchical directory structure for journal files.
# Parameters
- **base_dir**: Base directory for journal files: typically the hashtag
- **year**: Year as string (YYYY)
- **month**: Month as string (MM)
- **day**: Day as string (DD)
# Returns
Full path to the created directory, or None if creation failed
"""
global logger
# Get date components from config
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
base_dir = config.get('fetch', 'journaldir')
# Create the full path
dir_path = join(base_dir, year, month, day)
dir_path = abspath(dir_path)
# Check if directory exists
if exists(dir_path):
if isdir(dir_path):
if access(dir_path, W_OK):
return dir_path
else:
logger.critical(f"Directory '{dir_path}' exists but is not writeable")
return None
else:
logger.critical(f"Path '{dir_path}' exists but is not a directory")
return None
# Create directory structure
try:
makedirs(dir_path, exist_ok=True)
logger.debug(f"Created directory structure: '{dir_path}'")
return dir_path
except Exception as e:
logger.critical(f"Error creating directory structure '{dir_path}': {e}")
return None
fetch(config)
¶
This is the top-level function that will download toots and store them in a JSON cache. This
function will create a tooter and login to the server named in the cred_file.
Parameters¶
- config: A ConfigParser object from the config module
Config Parameters Used¶
- fetch:lookback: Number of days to look back in time. Toots older than that are ignored
- fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
- fetch:max: Max number of toots to pull from a server (default: 2000)
- fetch:hashtag: Hashtag to search for
- fetch:dry_run: If True, we contact our home server, but make no remote connections. If False, do it for real.
- fetch:api_base_url: Starting server for our first connection
- fetch:cred_file: Implicitly used when we create our Tooter
- fetch:overwrite: If True, overwrite files (re-fetch). If False and a file exists for a server, skip it. Default: False
- mastoscore:event_year: Year of the event (YYYY)
- mastoscore:event_month: Month of the event (MM)
- mastoscore:event_day: Day of the event (DD)
Returns¶
None
Source code in mastoscore/fetch.py
def fetch(config: ConfigParser) -> None:
"""
This is the top-level function that will download toots and store them in a JSON cache. This
function will create a [tooter](module-tooter.md) and login to the server named in the `cred_file`.
# Parameters
- **config**: A ConfigParser object from the [config](module-config.md) module
# Config Parameters Used
- fetch:lookback: Number of days to look back in time. Toots older than that are ignored
- fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
- fetch:max: Max number of toots to pull from a server (default: 2000)
- fetch:hashtag: Hashtag to search for
- fetch:dry_run: If True, we contact our home server, but make no remote connections. If False, do it for real.
- fetch:api_base_url: Starting server for our first connection
- fetch:cred_file: Implicitly used when we create our [Tooter](module-tooter.md)
- fetch:overwrite: If True, overwrite files (re-fetch). If False and a file exists for a server, skip it. Default: False
- mastoscore:event_year: Year of the event (YYYY)
- mastoscore:event_month: Month of the event (MM)
- mastoscore:event_day: Day of the event (DD)
# Returns
None
"""
global logger
lookback = config.getint('fetch', 'lookback')
journalfile = config.get('fetch', 'journalfile')
maxtoots = config.getint('fetch', 'max')
hashtag = config.get('fetch', 'hashtag')
debug = config.getint('fetch', 'debug')
dry_run = config.getboolean('fetch', 'dry_run')
api_base_url = config.get('fetch', 'api_base_url')
overwrite = config.getboolean('fetch', 'overwrite', fallback=False)
timezone = pytimezone(config.get('analyse', 'timezone'))
fresults = {}
start_time = datetime.datetime.now(tz=timezone)
end_time = datetime.datetime.now(tz=timezone)
fetch_duration = datetime.timedelta(0)
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(levelname)s\t%(message)s')
logger.setLevel(debug)
interval = datetime.timedelta(days=lookback)
oldest_date = datetime.datetime.now(datetime.timezone.utc) - interval
oldest_str = oldest_date.strftime("%Y-%m-%d")
logger.debug(f"Lookback is {lookback} days, which is {oldest_str}")
# Create directory structure
dir_path = create_journal_directory(config)
# Make sure we can write data before we try to fetch it
if dir_path is None:
return
try:
t = Tooter(config, 'fetch')
except Exception as e:
logger.critical(f"Failed to create Tooter for {api_base_url}")
logger.critical(e)
exit(1)
logger.debug(
f"Looking for at most {maxtoots} toots visible from {t.api_base_url} with #{hashtag} since {oldest_str}")
toots = t.search_hashtag(hashtag, interval, maxtoots)
if not toots:
logger.error(
"We found 0 toots for hashtag %s on %s", f"#{hashtag}", api_base_url)
return
else:
logger.info(f"Found {len(toots)} local toots")
df = toots2df(toots, api_base_url)
write_journal(config, df, api_base_url.split('/')[2])
# Look for non-local statuses. Let's figure out how many remote servers we need
# to contact. This splits a URI like https://example.net/blah/blah/blah on slashes
# takes the first 0-3 elements, and rejoins it on slashes. Produces https://example.net
uris = ['/'.join(s.split('/')[0:3]) for s in df['uri']]
# servers_done holds the list of servers that we've already contacted
# servers_todo holds the list we still need to contact
servers_done = set()
servers_todo = set(uris)
servers_fail = set()
total_toots = len(df)
try:
# don't need to contact our own server, because we already got the local toots.
servers_todo.remove(api_base_url)
except Exception:
logger.warning(
f"api_base_url ({api_base_url}) wasn't in the set.")
servers_done.add(api_base_url)
if dry_run:
# In a dry run, we don't reach out to remotes
logger.info(
f"We found {len(servers_todo)} remote servers, but dry_run is set, so we won't contact them")
logger.info(f"Remotes: {servers_todo}")
logger.info(
f"Done! Found {len(toots)} total toots across {len(servers_done)} servers.")
return
else:
del df
del toots
# Systematically reach out to each server and pull all the hashtag toots.
# This will likely return some toots that did not appear in the original set,
# because nobody on our local server follows the person on the other server.
while len(servers_todo) > 0:
uri = servers_todo.pop()
# If we don't want to overwrite files, return
server = uri.split('/')[2]
# amazonq-ignore-next-line
jfilename = join(dir_path, f"{journalfile}-{server}.json")
if not overwrite:
if exists(jfilename) and (stat(jfilename).st_size > 0):
logger.warning(f"{jfilename} exists, skipping")
continue
else:
logger.warning(f"Overwriting {jfilename}!")
newtoots = fetch_hashtag_remote(config, uri)
servers_done.add(uri)
if newtoots is None:
logger.warning(f"Got no toots back from {uri}")
servers_fail.add(uri)
continue
else:
logger.info(
f"Total {total_toots} after adding {len(newtoots)} toots from {uri}")
try:
df = toots2df(newtoots, uri)
except Exception as e:
logger.error(
f"Failed to convert {len(newtoots)} toots from {uri}")
logger.error(e)
continue
if not write_journal(config, df, server):
return
total_toots = total_toots + len(df)
del df
# Did we find any new servers mentioned as a side-effect of fetching this
# latest batch?
newuris = ['/'.join(s['uri'].split('/')[0:3]) for s in newtoots]
n = 0
for server in set(newuris):
if server not in servers_done and server not in servers_todo:
servers_todo.add(server)
n = n + 1
logger.info(
f"Added {n} new servers added by {uri}. Todo: {len(servers_todo)}, Done: {len(servers_done)}, Fail: {len(servers_fail)}")
end_time = datetime.datetime.now(tz=timezone)
fetch_start = start_time.strftime("%a %e %b %Y %H:%M:%S %Z")
fetch_end = end_time.strftime("%a %e %b %Y %H:%M:%S %Z")
fetch_duration = end_time - start_time
duration_string = ""
fetch_hours, fetch_seconds = divmod(int(fetch_duration.total_seconds()), 3600)
if fetch_hours > 1:
duration_string = f"{fetch_hours} hours"
elif fetch_hours == 1:
duration_string = f"{fetch_hours} hour"
fetch_minutes, fetch_seconds = divmod(fetch_seconds, 60)
if fetch_minutes > 1:
duration_string = f"{duration_string} {fetch_minutes} minutes"
elif fetch_minutes == 1:
duration_string = f"{duration_string} {fetch_minutes} minute"
if fetch_seconds > 1:
duration_string = f"{duration_string} {fetch_seconds} seconds"
elif fetch_seconds == 1:
duration_string = f"{duration_string} {fetch_seconds} second"
else:
duration_string = f"{duration_string} exactly"
fresults['total_toots'] = total_toots
fresults['servers_done'] = list(servers_done)
fresults['servers_fail'] = list(servers_fail)
fresults['oldest_date'] = oldest_date.strftime("%a %e %b %Y %H:%M:%S %Z")
fresults['total_toots'] = total_toots
fresults['fetch_time'] = fetch_start
fresults['fetch_end'] = fetch_end
fresults['fetch_duration'] = duration_string
fresults['fetch_version'] = __version__
write_json (config, 'fetch', fresults)
logger.info(
f"Done! Collected {total_toots} toots from {len(servers_done)} servers with {len(servers_fail)} failures.")
fetch_hashtag_remote(config, server)
¶
Given a uri of a toot, (like from Mastodon.status), create a Tooter for that URI. Connect and fetch the statuses. Return a few fields, but not all.
Parameters¶
- config: A ConfigParser object from the config module
- server: The api_base_url of a server to fetch from
Config Parameters Used¶
- fetch:lookback: Number of days to look back in time. Toots older than that are ignored
- fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
- fetch:max: Max number of toots to pull from a server (default: 2000)
- fetch:hashtag: Hashtag to search for
Returns¶
Dictionary of statuses in the raw JSON format from the API. Fields are not normalised or converted in any way. Since not all ActivityPub servers are exactly the same, it's not even sure which fields you get.
Source code in mastoscore/fetch.py
def fetch_hashtag_remote(config, server: str) -> list | None:
"""
Given a uri of a toot, (like from Mastodon.status), create a Tooter
for that URI. Connect and fetch the statuses. Return a few fields, but not all.
# Parameters
- **config**: A ConfigParser object from the [config](module-config.md) module
- **server**: The api_base_url of a server to fetch from
# Config Parameters Used
- fetch:lookback: Number of days to look back in time. Toots older than that are ignored
- fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
- fetch:max: Max number of toots to pull from a server (default: 2000)
- fetch:hashtag: Hashtag to search for
# Returns
Dictionary of statuses in the raw JSON format from the API. Fields are not normalised or
converted in any way. Since not all ActivityPub servers are exactly the same, it's not even
sure which fields you get.
"""
global logger
lookback = config.getint('fetch', 'lookback')
maxtoots = config.getint('fetch', 'max')
hashtag = config.get('fetch', 'hashtag')
interval = datetime.timedelta(days=lookback)
# Make the tooter that will do the searching.
try:
t = Tooter(config, 'fetch', server)
logger.info(f"Tooter created for {server}")
except Exception as e:
logger.warning(f"Failed to create Tooter for {server}")
logger.warning(e)
return None
try:
newtoots = t.search_hashtag(hashtag, interval, maxtoots)
except Exception:
logger.error(
f"fetch_hashtag_remote: failure fetching {hashtag} from {server}.")
return None
return newtoots
read_json(config, filename)
¶
Given a config and a filename (a fragment, like 'analysis'), figure out the path to the right JSON file. Read the file, if it exists, and return its contents in a dict. Return empty dict if there are problems.
Source code in mastoscore/fetch.py
def read_json(config: ConfigParser, filename: str) -> dict:
""" Given a config and a filename (a fragment, like 'analysis'), figure out the path
to the right JSON file. Read the file, if it exists, and return its contents
in a dict. Return empty dict if there are problems.
"""
global logger
logger = logging.getLogger(__name__)
journalfile = config.get('fetch', 'journalfile')
base_name = f"data-{journalfile}-{filename}.json"
# Create directory structure
dir_path = create_journal_directory(config)
if not dir_path:
return {}
jfilename = join(dir_path, base_name)
try:
with open(jfilename, 'r') as jfile:
analysis = load( jfile )
logger.debug( f"Opened {jfilename} and read {len(analysis.keys())} keys from it.")
except (OSError, IOError):
logger.warning(f"Failed to read {filename} analysis in {jfilename}")
raise
except Exception:
logger.warning(f"Failed to read {filename} analysis in {jfilename}")
raise
return analysis
toots2df(toots, api_base_url)
¶
Take in a list of toots from a tooter object, turn it into a pandas dataframe with a bunch of data normalized.
Parameters¶
- toots: list. A list of toots in the same format as returned by the search_hashtag() API
- api_base_url: string. Expected to include protocol, like
https://server.example.com.
Returns¶
A Pandas DataFrame that contains all the toots normalised. Normalisation includes:
- Converting date fields like created_at to timezone-aware datetime objects
- Converting integer fields like reblogs_count to integers
- Adding some columns (see below)
- Discarding all but a few columns. So many different systems return different columns, and I'm only
using a few of them. So I just discard everything else. This cuts down on storage and processing time.
Synthetic columns added:¶
- server: The server part of
api_base_url:server.example.comif theapi_base_urlishttps://server.example.com - userid: The user's name in
person@server.example.comformat. Note it does not have the leading@because tagging people is optional. - local: Boolean that is True if the toot comes from the
api_base_urlserver. False otherwise. - source: The server part of the server who owns the toot. I might be talking to
server.example.com, but they've sent me a copy of a toot fromother.example.social.
Source code in mastoscore/fetch.py
def toots2df(toots: list, api_base_url: str) -> pd.DataFrame:
"""
Take in a list of toots from a tooter object, turn it into a
pandas dataframe with a bunch of data normalized.
# Parameters
- toots: list. A list of toots in the same format as returned by the search_hashtag() API
- api_base_url: string. Expected to include protocol, like `https://server.example.com`.
# Returns
A Pandas DataFrame that contains all the toots normalised. Normalisation includes:
- Converting date fields like `created_at` to timezone-aware `datetime` objects
- Converting integer fields like `reblogs_count` to integers
- Adding some columns (see below)
- Discarding all but a few columns. So many different systems return different columns, and I'm only
using a few of them. So I just discard everything else. This cuts down on storage and processing time.
# Synthetic columns added:
- **server**: The server part of `api_base_url`: `server.example.com` if the `api_base_url` is `https://server.example.com`
- **userid**: The user's name in `person@server.example.com` format. Note it does not have the leading `@` because tagging people is optional.
- **local**: Boolean that is **True** if the toot comes from the `api_base_url` server. **False** otherwise.
- **source**: The server part of the server who owns the toot. I might be talking to `server.example.com`, but they've sent me a copy of a toot from `other.example.social`.
"""
df = pd.json_normalize(toots)
df['source'] = api_base_url.split('/')[2]
df['local'] = [True if i.startswith(
api_base_url) else False for i in df['uri']]
# make a new "server" column off of uris
df['server'] = [n.split('/')[2] for n in df['uri']]
df['userid'] = df['account.username'] + '@' + df['server']
df['reblogs_count'] = df['reblogs_count'].astype(int)
df['replies_count'] = df['replies_count'].astype(int)
df['favourites_count'] = df['favourites_count'].astype(int)
df['created_at'] = pd.to_datetime(
df['created_at'], utc=True, format='ISO8601')
# Define the columns to keep, all others will be deleted
desired_columns = {'account.display_name', 'account.indexable', 'account.url', 'content', 'created_at',
'external_replies_count', 'favourites_count', 'id', 'in_reply_to_id', 'local',
'max_boosts', 'max_faves', 'max_replies', 'most_toots', 'num_toots', 'preamble',
'reblogs_count', 'replies_count', 'required', 'self_reply_count', 'server',
'source', 'uri', 'url', 'userid'}
# Get the intersection of desired columns and actual columns
columns_to_keep = list(desired_columns.intersection(df.columns))
# Create new data frame with only desired columns, implicitly discarding all others
small_df = df[columns_to_keep]
return small_df
update_json(config, filename, results)
¶
Given a filename and a dictionary, read the JSON file, add the provided dictionary to it, then save the final result to the same location.
Source code in mastoscore/fetch.py
def update_json(config: ConfigParser, filename: str, results: dict) -> bool:
""" Given a filename and a dictionary, read the JSON file, add the
provided dictionary to it, then save the final result to the same
location.
"""
global logger
logger = logging.getLogger(__name__)
journalfile = config.get('fetch', 'journalfile')
base_name = f"data-{journalfile}-{filename}.json"
# Create directory structure
dir_path = create_journal_directory(config)
if not dir_path:
return False
# Create full file path
jfilename = join(dir_path, base_name)
try:
with open(jfilename, 'r') as jfile:
analysis = load( jfile )
logger.debug( f"Opened {jfilename} and read {len(analysis.keys())} keys from it.")
except (OSError, IOError) as e:
logger.critical(f"Failed to read {filename} analysis in {jfilename}")
logger.critical(e)
return False
except Exception as e:
logger.critical(f"Failed to read {filename} analysis in {jfilename}")
logger.critical(e)
return False
# append keys from results into analysis. Will overwrite if any are
# the same.
analysis.update( results )
try:
with open(jfilename, 'r') as jfile:
analysis = load( jfile )
except (OSError, IOError) as e:
logger.critical(f"Failed to read {filename} analysis in {jfilename}")
logger.critical(e)
return False
except Exception as e:
logger.critical(f"Failed to read {filename} analysis in {jfilename}")
logger.critical(e)
return False
logger.debug( f"Now {len(analysis.keys())} keys.")
logger.info(f"wrote {journalfile}-{filename}.json")
return True
write_journal(config, df, server)
¶
Take dataframe and the url it represents, and calls
pandas.DataFrame.to_json()
to write it to a corresponding json journal file. Writes it to a file in a hierarchical
directory structure: journaldir/year/month/day/journalfile-server.json.
Parameters¶
- config: A ConfigParser object from the config module
- df: A Pandas DataFrame full of toots to write out.
- server: The api_base_url of a server to fetch from
Config Parameters Used¶
- fetch:journaldir: Base directory to write journal files into
- fetch:journalfile: Journal file template
- mastoscore:event_year: Year of the event (YYYY)
- mastoscore:event_month: Month of the event (MM)
- mastoscore:event_day: Day of the event (DD)
Returns¶
True if successful, False otherwise
Source code in mastoscore/fetch.py
def write_journal(config: ConfigParser, df: pd.DataFrame, server: str) -> bool:
"""
Take dataframe and the url it represents, and calls
[pandas.DataFrame.to_json()](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_json.html)
to write it to a corresponding json journal file. Writes it to a file in a hierarchical
directory structure: `journaldir/year/month/day/journalfile-server.json`.
# Parameters
- **config**: A ConfigParser object from the [config](module-config.md) module
- **df**: A Pandas DataFrame full of toots to write out.
- **server**: The api_base_url of a server to fetch from
# Config Parameters Used
- fetch:journaldir: Base directory to write journal files into
- fetch:journalfile: Journal file template
- mastoscore:event_year: Year of the event (YYYY)
- mastoscore:event_month: Month of the event (MM)
- mastoscore:event_day: Day of the event (DD)
# Returns
True if successful, False otherwise
"""
global logger
logger = logging.getLogger(__name__)
journalfile = config.get('fetch', 'journalfile')
# Create directory structure
dir_path = create_journal_directory(config)
if not dir_path:
return False
# Create full file path
jfilename = join(dir_path, f"{journalfile}-{server}.json")
try:
df.to_json(jfilename, orient='records', date_format='iso',
date_unit='s')
logger.info(f"Wrote {len(df)} total toots to {jfilename}")
except Exception as e:
logger.critical(
f"Failed to write {len(df)} toots to {jfilename}")
logger.critical(e)
return False
return True
write_json(config, filename, results)
¶
Given a config and a filename (which is a fragment, like 'analysis'), write the dictionary into a JSON file. This will overwrite any existing JSON file of the same name. Use update_json() to update without clobbering the original.
Source code in mastoscore/fetch.py
def write_json (config: ConfigParser, filename: str, results: dict) -> bool:
""" Given a config and a filename (which is a fragment, like 'analysis'), write
the dictionary into a JSON file. This will overwrite any existing JSON file
of the same name. Use update_json() to update without clobbering the original.
"""
global logger
logger = logging.getLogger(__name__)
journalfile = config.get('fetch', 'journalfile')
base_name = f"data-{journalfile}-{filename}.json"
# Create directory structure
dir_path = create_journal_directory(config)
if not dir_path:
return False
# Create full file path
rfilename = join(dir_path, base_name)
try:
with open(rfilename, '+w') as rfile:
dump(results, rfile, cls=TimestampEncoder)
except (OSError, IOError) as e:
logger.critical(f"Failed to write {filename} results to {rfilename}")
logger.critical(e)
return False
except Exception as e:
logger.critical(f"Failed to write {filename} results to {rfilename}")
logger.critical(e)
return False
logger.info(f"wrote {journalfile}-{filename}.json")
return True