The pipeline starts when a new file is placed in a watched folder by pipeline part A. The Directory Watcher's responsibility is to call a callback function when a new file is created in the watched folder.
The DirectoryWatcher
provides a simple way to monitor a specified directory for file creation events and execute asynchronous callbacks in response. It utilizes the watchdog library for filesystem monitoring and integrates with asyncio for handling asynchronous tasks. Furthermore the DirectoryWatcher
uses threading.
NOTE: Threading is used to avoid blocking the main thread's code from executing.
# Importing
from lib.DirectoryWatcher import DirectoryWatcher
dirPath = "some/path/to/a/directory"
# Setup
async def newFileCreated(file_path: str):
print("New file created in " + file_path)
dirWatcher = DirectoryWatcher(
directory=dirPath, async_callback=newFileCreated
)
# A fast API event function running on startup
@app.on_event("startup")
async def startEvent():
dirWatcher.start_watching()
# A fast API event function running on shutdown
@app.on_event("shutdown")
def shutdown_event():
dirWatcher.stop_watching()
NOTE: The fast API event functions are not needed to use the
Directory Watcher
def __init__(self, directory, async_callback):
some/path/to/a/directory
def start_watching(self) -> threading.Thread:
def stop_watching(self):