Skip to content
Snippets Groups Projects
Verified Commit 99cc35e7 authored by Janne Mareike Koschinski's avatar Janne Mareike Koschinski
Browse files

feat: add status output when used in a terminal

parent 1dec25f1
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,8 @@ import argparse
import os
from typing import Iterator, List
from customio.StatusPrinter import StatusPrinter
from customio.streams import stdout
from files.FileFinder import FileFinder
from files.FileMover import FileMover
from models.FileMove import FileMove
......@@ -10,22 +12,30 @@ from models.FileMove import FileMove
def process_folders(folders: List[str]) -> Iterator[FileMove]:
for foldername in folders:
stdout().status("Reading folder… {}".format(foldername))
folder = os.path.abspath(foldername)
finder = FileFinder(folder)
mover = FileMover(folder)
status = StatusPrinter(stdout(), "Reading Files", finder.count())
for file in finder.all():
move = mover.move(file)
status.update(os.path.relpath(file, foldername))
if move is not None:
yield move
if __name__ == "__main__":
stdout().status("Initializing…")
parser = argparse.ArgumentParser()
parser.add_argument("folders", nargs="+", help="Folders to run the task on")
parser.add_argument("--dry-run", help="Simulate renames", action="store_true")
args = parser.parse_args()
moves = list(process_folders(args.folders))
statusTmp = StatusPrinter(stdout(), "Creating temporary files", len(moves))
for move in moves:
statusTmp.update(move.source)
move.move_intermediate(args.dry_run)
statusFinal = StatusPrinter(stdout(), "Writing files", len(moves))
for move in moves:
statusTmp.update(move.source)
move.move_target(args.dry_run)
import io
from customio.Output import Output
class OtherOutput(Output):
file: io.TextIOBase
def __init__(self, file: io.TextIOBase):
self.file = file
def status(self, data: str):
pass
def log(self, data: str):
print(data, file=self.file)
# noinspection PyMethodMayBeStatic
class Output:
def status(self, data: str):
"""
Print a temporary line of data
:param data: content to print
:return: Nothing
"""
pass
def log(self, data: str):
"""
Print a line of data
:param data: content to print
:return: Nothing
"""
pass
from customio import Output
class StatusPrinter:
task: str
index: int
total: int
def __init__(self, output: Output, task: str, total: int):
self.output = output
self.task = task
self.index = 0
self.total = total
def update(self, file: str):
data = self.task + ''
if self.total != 0:
data += "{} / {} ".format(self.index, self.total)
if file != '':
data += str(file)
self.output.status(data)
self.index += 1
import io
import shutil
from urwid import str_util
from customio.Output import Output
class TerminalOutput(Output):
file: io.TextIOBase
def __init__(self, file: io.TextIOBase):
self.file = file
def status(self, text: str):
print("\x1B[K", end='')
print(TerminalOutput.__truncate_terminal(text), end='\r', flush=True, file=self.file)
def log(self, data: str, error: bool = False):
if self.file.isatty():
print("\x1B[K", end="", file=self.file)
if error:
print("\x1B[1;31m", end="", file=self.file)
print(data, file=self.file)
@staticmethod
def __truncate_terminal(text: str) -> str:
columns = shutil.get_terminal_size((-1, -1)).columns
if columns <= 0:
return text
else:
return TerminalOutput.__truncate_width(text, columns)
@staticmethod
def __truncate_width(text, length):
result = ""
width = 0
for char in text:
charwidth = str_util.get_width(ord(char))
if width + charwidth >= length:
break
result += char
width += charwidth
return result
import io
import sys
from customio.Output import Output
from customio.OtherOutput import OtherOutput
from customio.TerminalOutput import TerminalOutput
def get_output(target: io.TextIOBase) -> Output:
if target.isatty():
return TerminalOutput(target)
else:
return OtherOutput(target)
def stdout():
# noinspection PyTypeChecker
return get_output(sys.stdout)
def stderr():
# noinspection PyTypeChecker
return get_output(sys.stderr)
......@@ -12,3 +12,9 @@ class FileFinder:
for subdir, dirs, files in os.walk(self.folder):
for filename in files:
yield os.path.join(self.folder, subdir, filename)
def count(self) -> int:
count = 0
for subdir, dirs, files in os.walk(self.folder):
count += len(files)
return count
......@@ -2,6 +2,7 @@ import os
from typing import Optional
from uuid import uuid4
from customio.streams import stderr
from extractors.get_extractor import get_extractor
from files.FileMetaParser import FileMetaParser
from models.FileMeta import FileMeta
......@@ -38,15 +39,15 @@ class FileMover:
def move(self, name: str) -> Optional[FileMove]:
extractor = get_extractor(name)
if extractor is None:
print("No Extractor for file: {0}".format(name))
stderr().log("No Extractor for file: {0}".format(name))
return None
track_meta = extractor.extract_tags()
if track_meta is None:
print("No Metadata for file: {0}".format(name))
stderr().log("No Metadata for file: {0}".format(name))
return None
file_meta = FileMetaParser(track_meta).get_meta()
if file_meta is None:
print("Metadata for file not valid: {0}".format(name))
stderr().log("Metadata for file not valid: {0}".format(name))
return None
target = self.__get_target_path(name, file_meta)
......
import os
from customio.streams import stdout
class FileMove:
source: str
......@@ -20,9 +22,9 @@ class FileMove:
def move_target(self, dry_run: bool = True):
if dry_run:
print("Moving File:")
print(" ← {0}".format(self.source))
print(" → {0}".format(self.target))
stdout().log("Moving File:")
stdout().log(" ← {0}".format(self.source))
stdout().log(" → {0}".format(self.target))
else:
target_folder = os.path.dirname(self.target)
if not os.path.exists(target_folder):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment