Module: fetch

The fetch module starts at your server where you're logged in and searches a hashtag. After it gets all the toots your server knows about, then it starts looking at where they came from. For each server that it finds mentioned, it calls fetch_remote(). Each time that it connects to a new server, it fetches every toot that server knows about the hashtag. Then it looks at the servers that are mentioned and adds any new ones to a list of servers to contact.

Public API

This code depends on Mastodon.py and uses it to connect to servers that are mentioned. If you know anything about the fediverse, you know that there's more than just Mastodon servers out there. There's Pleroma, Akkoma, and various other ActivityPub-compatible servers. Some are derived from Mastodon and implement the same APIs. Others don't. Some Mastodon servers offer public read APIs, others don't. So servers that allow public read of their APIs will send you the details on their toots. Servers that don't allow public read, or that don't implement a Mastodon-compatible timeline API will be quietly skipped.

Directory Structure

Fetch organizes data in a directory based on the date of the event. So if the journaldir is data, the hashtag is monsterdon, and the event_date is 2025-10-19, then the directory structure is data/2025/10/19 and all the files will have the monsterdon hashtag in their name. See the example below:

data
├── 2025
│  └── 10
│     ├── 19
│     │  ├── data-monsterdon-analysis.json
│     │  ├── data-monsterdon-fetch.json
│     │  ├── index.md
│     │  ├── monsterdon-20251019.png
│     │  ├── monsterdon-20251019.txt
│     │  ├── monsterdon-beige.party.json
│     │  ├── monsterdon-bolha.us.json
│     │  ... lots more files, one per server...
│     │  ├── wordcloud-monsterdon-20251019-remove.png
│     │  └── wordcloud-monsterdon-20251019-remove.txt

List of files

Every file has the hashtag and the date in its name. If you ran the same analysis on 2 different hashtags on the same day, none of the files would conflict, though they would all be stored in the same directory.

Module for fetching toots for a hashtag.

TimestampEncoder

Bases: JSONEncoder

A convenience function that converts Pandas Timestamp objects to an ISO string

Source code in mastoscore/fetch.py
class TimestampEncoder(JSONEncoder):
    """ A convenience function that converts Pandas Timestamp objects to an ISO string """
    def default(self, o):
        if isinstance(o, Timestamp):
            return o.isoformat()
        if isinstance(o, pd.api.typing.NaTType):
            return "null"
        return super().default(o)

check_journaldir(dir_name)

Check if a directory exists and create it if it doesn't.

Parameters

  • directory (str): The name of the directory to check/create.

Returns:

bool: True if the directory already existed or was created, False means we tried to create it and failed.

Source code in mastoscore/fetch.py
def check_journaldir(dir_name: str) -> bool:
    """
    Check if a directory exists and create it if it doesn't.

    # Parameters
    - directory (str): The name of the directory to check/create.

    # Returns:
    bool: True if the directory already existed or was created, False means we tried
       to create it and failed.
    """
    global logger

    journaldir = abspath(dir_name)
    if exists(journaldir):
        if isdir(journaldir):
            if access(journaldir, W_OK):
                return True
            else:
                logger.critical(f"'{journaldir}' directory exists but is not writeable")
                return False
        else:
            logger.critical(f"Something already exists at '{journaldir}' but it is not a directory")
            return False
    else:
        try:
            makedirs(journaldir)
            logger.warning(f"Created '{journaldir}' successfully.")
            return True
        except OSError as e:
            logger.critical(f"Error creating directory '{journaldir}': {e}")
            return False
        except Exception as e:
            logger.critical(f"Error creating directory '{journaldir}': {e}")
            return False

create_journal_directory(config)

Create a hierarchical directory structure for journal files.

Parameters

  • base_dir: Base directory for journal files: typically the hashtag
  • year: Year as string (YYYY)
  • month: Month as string (MM)
  • day: Day as string (DD)

Returns

Full path to the created directory, or None if creation failed

Source code in mastoscore/fetch.py
def create_journal_directory(config:ConfigParser) -> str | None:
    """
    Create a hierarchical directory structure for journal files.

    # Parameters
    - **base_dir**: Base directory for journal files: typically the hashtag
    - **year**: Year as string (YYYY)
    - **month**: Month as string (MM)
    - **day**: Day as string (DD)

    # Returns

    Full path to the created directory, or None if creation failed
    """
    global logger

    # Get date components from config
    year = config.get('mastoscore', 'event_year')
    month = config.get('mastoscore', 'event_month')
    day = config.get('mastoscore', 'event_day')
    base_dir = config.get('fetch', 'journaldir')

    # Create the full path
    dir_path = join(base_dir, year, month, day)
    dir_path = abspath(dir_path)

    # Check if directory exists
    if exists(dir_path):
        if isdir(dir_path):
            if access(dir_path, W_OK):
                return dir_path
            else:
                logger.critical(f"Directory '{dir_path}' exists but is not writeable")
                return None
        else:
            logger.critical(f"Path '{dir_path}' exists but is not a directory")
            return None

    # Create directory structure
    try:
        makedirs(dir_path, exist_ok=True)
        logger.debug(f"Created directory structure: '{dir_path}'")
        return dir_path
    except Exception as e:
        logger.critical(f"Error creating directory structure '{dir_path}': {e}")
        return None

fetch(config)

This is the top-level function that will download toots and store them in a JSON cache. This function will create a tooter and login to the server named in the cred_file.

Parameters

  • config: A ConfigParser object from the config module

Config Parameters Used

  • fetch:lookback: Number of days to look back in time. Toots older than that are ignored
  • fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
  • fetch:max: Max number of toots to pull from a server (default: 2000)
  • fetch:hashtag: Hashtag to search for
  • fetch:dry_run: If True, we contact our home server, but make no remote connections. If False, do it for real.
  • fetch:api_base_url: Starting server for our first connection
  • fetch:cred_file: Implicitly used when we create our Tooter
  • fetch:overwrite: If True, overwrite files (re-fetch). If False and a file exists for a server, skip it. Default: False
  • mastoscore:event_year: Year of the event (YYYY)
  • mastoscore:event_month: Month of the event (MM)
  • mastoscore:event_day: Day of the event (DD)

Returns

None

Source code in mastoscore/fetch.py
def fetch(config: ConfigParser) -> None:
    """
    This is the top-level function that will download toots and store them in a JSON cache. This
    function will create a [tooter](module-tooter.md) and login to the server named in the `cred_file`.

    # Parameters
    - **config**: A ConfigParser object from the [config](module-config.md) module

    # Config Parameters Used
    - fetch:lookback: Number of days to look back in time. Toots older than that are ignored
    - fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
    - fetch:max: Max number of toots to pull from a server (default: 2000)
    - fetch:hashtag: Hashtag to search for
    - fetch:dry_run: If True, we contact our home server, but make no remote connections. If False, do it for real.
    - fetch:api_base_url: Starting server for our first connection
    - fetch:cred_file: Implicitly used when we create our [Tooter](module-tooter.md)
    - fetch:overwrite: If True, overwrite files (re-fetch). If False and a file exists for a server, skip it. Default: False
    - mastoscore:event_year: Year of the event (YYYY)
    - mastoscore:event_month: Month of the event (MM)
    - mastoscore:event_day: Day of the event (DD)

    # Returns

    None
    """
    global logger

    lookback = config.getint('fetch', 'lookback')
    journalfile = config.get('fetch', 'journalfile')
    maxtoots = config.getint('fetch', 'max')
    hashtag = config.get('fetch', 'hashtag')
    debug = config.getint('fetch', 'debug')
    dry_run = config.getboolean('fetch', 'dry_run')
    api_base_url = config.get('fetch', 'api_base_url')
    overwrite = config.getboolean('fetch', 'overwrite', fallback=False)
    timezone = pytimezone(config.get('analyse', 'timezone'))
    fresults = {}
    start_time = datetime.datetime.now(tz=timezone)
    end_time = datetime.datetime.now(tz=timezone)
    fetch_duration = datetime.timedelta(0)
    logger = logging.getLogger(__name__)
    logging.basicConfig(format='%(levelname)s\t%(message)s')
    logger.setLevel(debug)

    interval = datetime.timedelta(days=lookback)
    oldest_date = datetime.datetime.now(datetime.timezone.utc) - interval
    oldest_str = oldest_date.strftime("%Y-%m-%d")
    logger.debug(f"Lookback is {lookback} days, which is {oldest_str}")

    # Create directory structure
    dir_path = create_journal_directory(config)
    # Make sure we can write data before we try to fetch it
    if dir_path is None:
        return

    try:
        t = Tooter(config, 'fetch')
    except Exception as e:
        logger.critical(f"Failed to create Tooter for {api_base_url}")
        logger.critical(e)
        exit(1)

    logger.debug(
        f"Looking for at most {maxtoots} toots visible from {t.api_base_url} with #{hashtag} since {oldest_str}")

    toots = t.search_hashtag(hashtag, interval, maxtoots)
    if not toots:
        logger.error(
            "We found 0 toots for hashtag %s on %s", f"#{hashtag}", api_base_url)
        return
    else:
        logger.info(f"Found {len(toots)} local toots")
        df = toots2df(toots, api_base_url)
        write_journal(config, df, api_base_url.split('/')[2])

    # Look for non-local statuses. Let's figure out how many remote servers we need
    # to contact. This splits a URI like https://example.net/blah/blah/blah on slashes
    # takes the first 0-3 elements, and rejoins it on slashes. Produces https://example.net
    uris = ['/'.join(s.split('/')[0:3]) for s in df['uri']]

    # servers_done holds the list of servers that we've already contacted
    # servers_todo holds the list we still need to contact
    servers_done = set()
    servers_todo = set(uris)
    servers_fail = set()
    total_toots = len(df)
    try:
        # don't need to contact our own server, because we already got the local toots.
        servers_todo.remove(api_base_url)
    except Exception:
        logger.warning(
            f"api_base_url ({api_base_url}) wasn't in the set.")
    servers_done.add(api_base_url)

    if dry_run:
        # In a dry run, we don't reach out to remotes
        logger.info(
            f"We found {len(servers_todo)} remote servers, but dry_run is set, so we won't contact them")
        logger.info(f"Remotes: {servers_todo}")
        logger.info(
            f"Done! Found {len(toots)} total toots across {len(servers_done)} servers.")
        return
    else:
        del df
        del toots
    # Systematically reach out to each server and pull all the hashtag toots.
    # This will likely return some toots that did not appear in the original set,
    # because nobody on our local server follows the person on the other server.
    while len(servers_todo) > 0:
        uri = servers_todo.pop()
        # If we don't want to overwrite files, return
        server = uri.split('/')[2]
        # amazonq-ignore-next-line
        jfilename = join(dir_path, f"{journalfile}-{server}.json")
        if not overwrite:
            if exists(jfilename) and (stat(jfilename).st_size > 0):
                logger.warning(f"{jfilename} exists, skipping")
                continue
        else:
            logger.warning(f"Overwriting {jfilename}!")

        newtoots = fetch_hashtag_remote(config, uri)
        servers_done.add(uri)
        if newtoots is None:
            logger.warning(f"Got no toots back from {uri}")
            servers_fail.add(uri)
            continue
        else:
            logger.info(
                f"Total {total_toots} after adding {len(newtoots)} toots from {uri}")
            try:
                df = toots2df(newtoots, uri)
            except Exception as e:
                logger.error(
                    f"Failed to convert {len(newtoots)} toots from {uri}")
                logger.error(e)
                continue
            if not write_journal(config, df, server):
                return
            total_toots = total_toots + len(df)
            del df
        # Did we find any new servers mentioned as a side-effect of fetching this
        # latest batch?
        newuris = ['/'.join(s['uri'].split('/')[0:3]) for s in newtoots]
        n = 0
        for server in set(newuris):
            if server not in servers_done and server not in servers_todo:
                servers_todo.add(server)
                n = n + 1
        logger.info(
            f"Added {n} new servers added by {uri}. Todo: {len(servers_todo)}, Done: {len(servers_done)}, Fail: {len(servers_fail)}")

    end_time = datetime.datetime.now(tz=timezone)
    fetch_start = start_time.strftime("%a %e %b %Y %H:%M:%S %Z")
    fetch_end = end_time.strftime("%a %e %b %Y %H:%M:%S %Z")
    fetch_duration = end_time - start_time
    duration_string = ""
    fetch_hours, fetch_seconds = divmod(int(fetch_duration.total_seconds()), 3600)
    if fetch_hours > 1:
        duration_string = f"{fetch_hours} hours"
    elif fetch_hours == 1:
        duration_string = f"{fetch_hours} hour"

    fetch_minutes, fetch_seconds = divmod(fetch_seconds, 60)
    if fetch_minutes > 1:
        duration_string = f"{duration_string} {fetch_minutes} minutes"
    elif fetch_minutes == 1:
        duration_string = f"{duration_string} {fetch_minutes} minute"

    if fetch_seconds > 1:
        duration_string = f"{duration_string} {fetch_seconds} seconds"
    elif fetch_seconds == 1:
        duration_string = f"{duration_string} {fetch_seconds} second"
    else:
        duration_string = f"{duration_string} exactly"

    fresults['total_toots'] = total_toots
    fresults['servers_done'] = list(servers_done)
    fresults['servers_fail'] = list(servers_fail)
    fresults['oldest_date'] = oldest_date.strftime("%a %e %b %Y %H:%M:%S %Z")
    fresults['total_toots'] = total_toots
    fresults['fetch_time'] = fetch_start
    fresults['fetch_end'] = fetch_end
    fresults['fetch_duration'] = duration_string
    fresults['fetch_version'] = __version__

    write_json (config, 'fetch', fresults)
    logger.info(
        f"Done! Collected {total_toots} toots from {len(servers_done)} servers with {len(servers_fail)} failures.")

fetch_hashtag_remote(config, server)

Given a uri of a toot, (like from Mastodon.status), create a Tooter for that URI. Connect and fetch the statuses. Return a few fields, but not all.

Parameters

  • config: A ConfigParser object from the config module
  • server: The api_base_url of a server to fetch from

Config Parameters Used

  • fetch:lookback: Number of days to look back in time. Toots older than that are ignored
  • fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
  • fetch:max: Max number of toots to pull from a server (default: 2000)
  • fetch:hashtag: Hashtag to search for

Returns

Dictionary of statuses in the raw JSON format from the API. Fields are not normalised or converted in any way. Since not all ActivityPub servers are exactly the same, it's not even sure which fields you get.

Source code in mastoscore/fetch.py
def fetch_hashtag_remote(config, server: str) -> list | None:
    """
    Given a uri of a toot, (like from Mastodon.status), create a Tooter
    for that URI. Connect and fetch the statuses. Return a few fields, but not all.

    # Parameters
    - **config**: A ConfigParser object from the [config](module-config.md) module
    - **server**: The api_base_url of a server to fetch from

    # Config Parameters Used
    - fetch:lookback: Number of days to look back in time. Toots older than that are ignored
    - fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
    - fetch:max: Max number of toots to pull from a server (default: 2000)
    - fetch:hashtag: Hashtag to search for

    # Returns

    Dictionary of statuses in the raw JSON format from the API. Fields are not normalised or
    converted in any way. Since not all ActivityPub servers are exactly the same, it's not even
    sure which fields you get.
    """
    global logger

    lookback = config.getint('fetch', 'lookback')
    maxtoots = config.getint('fetch', 'max')
    hashtag = config.get('fetch', 'hashtag')

    interval = datetime.timedelta(days=lookback)

    # Make the tooter that will do the searching.
    try:
        t = Tooter(config, 'fetch', server)
        logger.info(f"Tooter created for {server}")
    except Exception as e:
        logger.warning(f"Failed to create Tooter for {server}")
        logger.warning(e)
        return None

    try:
        newtoots = t.search_hashtag(hashtag, interval, maxtoots)
    except Exception:
        logger.error(
            f"fetch_hashtag_remote: failure fetching {hashtag} from {server}.")
        return None
    return newtoots

read_json(config, filename)

Given a config and a filename (a fragment, like 'analysis'), figure out the path to the right JSON file. Read the file, if it exists, and return its contents in a dict. Return empty dict if there are problems.

Source code in mastoscore/fetch.py
def read_json(config: ConfigParser, filename: str) -> dict:
    """ Given a config and a filename (a fragment, like 'analysis'), figure out the path
        to the right JSON file. Read the file, if it exists, and return its contents
        in a dict. Return empty dict if there are problems.
    """
    global logger
    logger = logging.getLogger(__name__)

    journalfile = config.get('fetch', 'journalfile')
    base_name = f"data-{journalfile}-{filename}.json"

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return {}

    jfilename = join(dir_path, base_name)
    try:
        with open(jfilename, 'r') as jfile:
            analysis = load( jfile )
        logger.debug( f"Opened {jfilename} and read {len(analysis.keys())} keys from it.")
    except (OSError, IOError):
        logger.warning(f"Failed to read {filename} analysis in {jfilename}")
        raise
    except Exception:
        logger.warning(f"Failed to read {filename} analysis in {jfilename}")
        raise

    return analysis

toots2df(toots, api_base_url)

Take in a list of toots from a tooter object, turn it into a pandas dataframe with a bunch of data normalized.

Parameters

  • toots: list. A list of toots in the same format as returned by the search_hashtag() API
  • api_base_url: string. Expected to include protocol, like https://server.example.com.

Returns

A Pandas DataFrame that contains all the toots normalised. Normalisation includes: - Converting date fields like created_at to timezone-aware datetime objects - Converting integer fields like reblogs_count to integers - Adding some columns (see below) - Discarding all but a few columns. So many different systems return different columns, and I'm only using a few of them. So I just discard everything else. This cuts down on storage and processing time.

Synthetic columns added:

  • server: The server part of api_base_url: server.example.com if the api_base_url is https://server.example.com
  • userid: The user's name in person@server.example.com format. Note it does not have the leading @ because tagging people is optional.
  • local: Boolean that is True if the toot comes from the api_base_url server. False otherwise.
  • source: The server part of the server who owns the toot. I might be talking to server.example.com, but they've sent me a copy of a toot from other.example.social.
Source code in mastoscore/fetch.py
def toots2df(toots: list, api_base_url: str) -> pd.DataFrame:
    """
    Take in a list of toots from a tooter object, turn it into a
    pandas dataframe with a bunch of data normalized.

    # Parameters
    - toots: list. A list of toots in the same format as returned by the search_hashtag() API
    - api_base_url: string. Expected to include protocol, like `https://server.example.com`.

    # Returns
    A Pandas DataFrame that contains all the toots normalised. Normalisation includes:
    - Converting date fields like `created_at` to timezone-aware `datetime` objects
    - Converting integer fields like `reblogs_count` to integers
    - Adding some columns (see below)
    - Discarding all but a few columns. So many different systems return different columns, and I'm only
      using a few of them. So I just discard everything else. This cuts down on storage and processing time.

    # Synthetic columns added:
    - **server**: The server part of `api_base_url`: `server.example.com` if the `api_base_url` is `https://server.example.com`
    - **userid**: The user's name in `person@server.example.com` format. Note it does not have the leading `@` because tagging people is optional.
    - **local**: Boolean that is **True** if the toot comes from the `api_base_url` server. **False** otherwise.
    - **source**: The server part of the server who owns the toot. I might be talking to `server.example.com`, but they've sent me a copy of a toot from `other.example.social`.
    """

    df = pd.json_normalize(toots)
    df['source'] = api_base_url.split('/')[2]
    df['local'] = [True if i.startswith(
        api_base_url) else False for i in df['uri']]
    # make a new "server" column off of uris
    df['server'] = [n.split('/')[2] for n in df['uri']]
    df['userid'] = df['account.username'] + '@' + df['server']
    df['reblogs_count'] = df['reblogs_count'].astype(int)
    df['replies_count'] = df['replies_count'].astype(int)
    df['favourites_count'] = df['favourites_count'].astype(int)
    df['created_at'] = pd.to_datetime(
        df['created_at'], utc=True, format='ISO8601')

    # Define the columns to keep, all others will be deleted
    desired_columns = {'account.display_name', 'account.indexable', 'account.url', 'content', 'created_at',
        'external_replies_count', 'favourites_count', 'id', 'in_reply_to_id', 'local',
        'max_boosts', 'max_faves', 'max_replies', 'most_toots', 'num_toots', 'preamble',
        'reblogs_count', 'replies_count', 'required', 'self_reply_count', 'server',
        'source', 'uri', 'url', 'userid'}
    # Get the intersection of desired columns and actual columns
    columns_to_keep = list(desired_columns.intersection(df.columns))

    # Create new data frame with only desired columns, implicitly discarding all others
    small_df = df[columns_to_keep]

    return small_df

update_json(config, filename, results)

Given a filename and a dictionary, read the JSON file, add the provided dictionary to it, then save the final result to the same location.

Source code in mastoscore/fetch.py
def update_json(config: ConfigParser, filename: str, results: dict) -> bool:
    """ Given a filename and a dictionary, read the JSON file, add the
        provided dictionary to it, then save the final result to the same
        location.
    """
    global logger
    logger = logging.getLogger(__name__)

    journalfile = config.get('fetch', 'journalfile')
    base_name = f"data-{journalfile}-{filename}.json"

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return False

    # Create full file path
    jfilename = join(dir_path, base_name)
    try:
        with open(jfilename, 'r') as jfile:
            analysis = load( jfile )
        logger.debug( f"Opened {jfilename} and read {len(analysis.keys())} keys from it.")
    except (OSError, IOError) as e:
        logger.critical(f"Failed to read {filename} analysis in {jfilename}")
        logger.critical(e)
        return False
    except Exception as e:
        logger.critical(f"Failed to read {filename} analysis in {jfilename}")
        logger.critical(e)
        return False

    # append keys from results into analysis. Will overwrite if any are
    # the same.
    analysis.update( results )
    try:
        with open(jfilename, 'r') as jfile:
            analysis = load( jfile )
    except (OSError, IOError) as e:
        logger.critical(f"Failed to read {filename} analysis in {jfilename}")
        logger.critical(e)
        return False
    except Exception as e:
        logger.critical(f"Failed to read {filename} analysis in {jfilename}")
        logger.critical(e)
        return False

    logger.debug( f"Now {len(analysis.keys())} keys.")
    logger.info(f"wrote {journalfile}-{filename}.json")
    return True

write_journal(config, df, server)

Take dataframe and the url it represents, and calls pandas.DataFrame.to_json() to write it to a corresponding json journal file. Writes it to a file in a hierarchical directory structure: journaldir/year/month/day/journalfile-server.json.

Parameters

  • config: A ConfigParser object from the config module
  • df: A Pandas DataFrame full of toots to write out.
  • server: The api_base_url of a server to fetch from

Config Parameters Used

  • fetch:journaldir: Base directory to write journal files into
  • fetch:journalfile: Journal file template
  • mastoscore:event_year: Year of the event (YYYY)
  • mastoscore:event_month: Month of the event (MM)
  • mastoscore:event_day: Day of the event (DD)

Returns

True if successful, False otherwise

Source code in mastoscore/fetch.py
def write_journal(config: ConfigParser, df: pd.DataFrame, server: str) -> bool:
    """
    Take dataframe and the url it represents, and calls
    [pandas.DataFrame.to_json()](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_json.html)
    to write it to a corresponding json journal file. Writes it to a file in a hierarchical
    directory structure: `journaldir/year/month/day/journalfile-server.json`.

    # Parameters
    - **config**: A ConfigParser object from the [config](module-config.md) module
    - **df**: A Pandas DataFrame full of toots to write out.
    - **server**: The api_base_url of a server to fetch from

    # Config Parameters Used
    - fetch:journaldir: Base directory to write journal files into
    - fetch:journalfile: Journal file template
    - mastoscore:event_year: Year of the event (YYYY)
    - mastoscore:event_month: Month of the event (MM)
    - mastoscore:event_day: Day of the event (DD)

    # Returns

    True if successful, False otherwise
    """
    global logger
    logger = logging.getLogger(__name__)

    journalfile = config.get('fetch', 'journalfile')

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return False

    # Create full file path
    jfilename = join(dir_path, f"{journalfile}-{server}.json")

    try:
        df.to_json(jfilename, orient='records', date_format='iso',
                   date_unit='s')
        logger.info(f"Wrote {len(df)} total toots to {jfilename}")
    except Exception as e:
        logger.critical(
            f"Failed to write {len(df)} toots to {jfilename}")
        logger.critical(e)
        return False
    return True

write_json(config, filename, results)

Given a config and a filename (which is a fragment, like 'analysis'), write the dictionary into a JSON file. This will overwrite any existing JSON file of the same name. Use update_json() to update without clobbering the original.

Source code in mastoscore/fetch.py
def write_json (config: ConfigParser, filename: str, results: dict) -> bool:
    """ Given a config and a filename (which is a fragment, like 'analysis'), write
        the dictionary into a JSON file. This will overwrite any existing JSON file
        of the same name. Use update_json() to update without clobbering the original.
    """
    global logger
    logger = logging.getLogger(__name__)

    journalfile = config.get('fetch', 'journalfile')
    base_name = f"data-{journalfile}-{filename}.json"

    # Create directory structure
    dir_path = create_journal_directory(config)
    if not dir_path:
        return False

    # Create full file path
    rfilename = join(dir_path, base_name)
    try:
        with open(rfilename, '+w') as rfile:
            dump(results, rfile, cls=TimestampEncoder)
    except (OSError, IOError) as e:
        logger.critical(f"Failed to write {filename} results to {rfilename}")
        logger.critical(e)
        return False
    except Exception as e:
        logger.critical(f"Failed to write {filename} results to {rfilename}")
        logger.critical(e)
        return False

    logger.info(f"wrote {journalfile}-{filename}.json")
    return True