Skip to content
This repository was archived by the owner on Jan 23, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b9866ea
[type-hints] Add type hints to fiber/config.py
calio Sep 3, 2020
968b887
[type-hints] Add type hints to fiber/core.py
calio Sep 4, 2020
779834a
[type-hints] Add type hints to fiber/backend.py
calio Sep 5, 2020
a84834c
[type-hints] Add type hints to fiber/init.py
calio Sep 5, 2020
1d6d68f
[type-hints] Add type hints to fiber/util.py
calio Sep 5, 2020
fc55faa
[type-hints] Add type hints to fiber/spawn.py
calio Sep 5, 2020
d593fd7
Make `Backend` an abstract base class
calio Sep 5, 2020
433926c
[type-hints] Add type hints to fiber/local_backend.py
calio Sep 5, 2020
e69b013
[type-hints] Add type hints to fiber/kubernetes_backend.py
calio Sep 5, 2020
54044d6
[type-hints] Add type hints to fiber/docker_backend.py
calio Sep 5, 2020
d934799
[type-hints] Add type hints to fiber/__init__.py
calio Sep 5, 2020
6d8f0b5
[type-hints] Add type hints to fiber/backend.py
calio Sep 5, 2020
896ec50
[type-hints] Add type hints to fiber/cli.py
calio Sep 5, 2020
4b10f7b
[type-hints] Add type hints to fiber/context.py
calio Sep 5, 2020
ad30a73
[type-hints] Add type hints to fiber/meta.py
calio Sep 5, 2020
6d633a4
[type-hints] Add type hints to fiber/popen_fiber_spawn.py
calio Sep 6, 2020
0e7cf77
`Backend.wait_for_job` should accept `None` as `timeout` argument
calio Sep 6, 2020
0496d38
[type-hints] Add type hints to fiber/process.py
calio Sep 6, 2020
75a544a
[type-hints] Add type hints to fiber/socket.py
calio Sep 6, 2020
7cdde55
[type-hints] Add type hints to fiber/queues.py
calio Sep 6, 2020
f976d8c
[type-hints] Add type hints to fiber/pool.py
calio Sep 6, 2020
aec2094
[type-hints] Add type hints to fiber/managers.py
calio Sep 7, 2020
6c9bf59
[type-hints] Add type hints to fiber/experimental/ring.py
calio Sep 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions fiber/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
from fiber import context
from fiber.init import init_fiber
from fiber.meta import meta
from typing import List
from typing import TYPE_CHECKING

__version__: str
_in_interactive_console: bool
_names: List[str]


__version__ = "0.2.1"
Expand Down Expand Up @@ -47,11 +53,11 @@
_in_interactive_console = False


def reset():
def reset() -> None:
init_fiber()


def init(**kwargs):
def init(**kwargs) -> None:
"""
Initialize Fiber. This function is called when you want to re-initialize
Fiber with new config values and also re-init loggers.
Expand All @@ -66,3 +72,15 @@ def init(**kwargs):
globals().update((name, getattr(context._default_context, name))
for name in _names)
__all__ = _names + []


if TYPE_CHECKING:
current_process = context.FiberContext.current_process
active_children = context.FiberContext.active_children
Process = context.FiberContext.Process
Manager = context.FiberContext.Manager
Pool = context.FiberContext.Pool
SimpleQueue = context.FiberContext.SimpleQueue
Pipe = context.FiberContext.Pipe
cpu_count = context.FiberContext.cpu_count
get_context = context.FiberContext.get_context
16 changes: 10 additions & 6 deletions fiber/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
import multiprocessing as mp

import fiber.config as config
from fiber.core import Backend

_backends: dict


_backends = {}
available_backend = ['kubernetes', 'docker', 'local']


def is_inside_kubenetes_job():
def is_inside_kubenetes_job() -> bool:
if os.environ.get("KUBERNETES_SERVICE_HOST", None):
return True
return False


def is_inside_docker_job():
def is_inside_docker_job() -> bool:
if os.environ.get("FIBER_BACKEND", "") == "docker":
return True
return False
Expand All @@ -42,7 +45,7 @@ def is_inside_docker_job():
}


def auto_select_backend():
def auto_select_backend() -> str:
for backend_name, test in BACKEND_TESTS.items():
if test():
name = backend_name
Expand All @@ -53,7 +56,7 @@ def auto_select_backend():
return name


def get_backend(name=None, **kwargs):
def get_backend(name=None, **kwargs) -> Backend:
"""
Returns a working Fiber backend. If `name` is specified, returns a
backend specified by `name`.
Expand All @@ -70,7 +73,8 @@ def get_backend(name=None, **kwargs):

_backend = _backends.get(name, None)
if _backend is None:
_backend = importlib.import_module("fiber.{}_backend".format(
name)).Backend(**kwargs)
backend_name = "fiber.{}_backend".format(name)
backend_module = importlib.import_module(backend_name)
_backend = backend_module.Backend(**kwargs) # type: ignore
_backends[name] = _backend
return _backend
82 changes: 55 additions & 27 deletions fiber/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,27 @@
import fiber
import fiber.core as core
from fiber.core import ProcessStatus
import pathlib
from typing import Any, List, Tuple, TypeVar, Optional, Union, Dict

_T0 = TypeVar("_T0")
_TDockerImageBuilder = TypeVar(
"_TDockerImageBuilder", bound="DockerImageBuilder"
)
CONFIG: Dict[str, str]


CONFIG = {}


def get_backend(platform):
def get_backend(platform: str) -> fiber.kubernetes_backend.Backend:
from fiber.kubernetes_backend import Backend as K8sBackend

backend = K8sBackend(incluster=False)
return backend


def find_docker_files():
def find_docker_files() -> List[pathlib.Path]:
"""Find all possible docker files on current directory."""
p = Path(".")
q = p / "Dockerfile"
Expand All @@ -58,7 +67,7 @@ def find_docker_files():
return files


def select_docker_file(files):
def select_docker_file(files: List[pathlib.Path]) -> pathlib.Path:
"""Ask user which docker file to use and return a PurePath object."""
num = 0
n = len(files)
Expand Down Expand Up @@ -89,7 +98,7 @@ def select_docker_file(files):
return files[num]


def get_default_project_gcp():
def get_default_project_gcp() -> str:
"""Get default GCP project name."""
name = sp.check_output(
"gcloud config list --format 'value(core.project)' 2>/dev/null",
Expand All @@ -98,7 +107,7 @@ def get_default_project_gcp():
return name.decode("utf-8").strip()


def parse_file_path(path):
def parse_file_path(path: str) -> Tuple[Optional[str], str]:
parts = path.split(":")
if len(parts) == 1:
return (None, path)
Expand All @@ -112,7 +121,7 @@ def parse_file_path(path):
@click.command()
@click.argument("src")
@click.argument("dst")
def cp(src, dst):
def cp(src: str, dst: str) -> None:
"""Copy file from a persistent storage"""
platform = CONFIG["platform"]

Expand All @@ -125,12 +134,14 @@ def cp(src, dst):
)

if parts_src[0]:
volume = parts_src[0]
elif parts_dst[0]:
volume = parts_dst[0]
parsed_volume = parts_src[0]
else:
parsed_volume = parts_dst[0]

if parsed_volume is None:
raise ValueError("Must copy/to from a persistent volume")

volume:str = parsed_volume
k8s_backend = get_backend(platform)

job_spec = core.JobSpec(
Expand All @@ -140,6 +151,9 @@ def cp(src, dst):
volumes={volume: {"mode": "rw", "bind": "/persistent"}},
)
job = k8s_backend.create_job(job_spec)
if job.data is None:
raise RuntimeError("Failed to create a new job for data copying")

pod_name = job.data.metadata.name

print("launched pod: {}".format(pod_name))
Expand Down Expand Up @@ -170,7 +184,7 @@ def cp(src, dst):
# k8s_backend.terminate_job(job)


def detect_platforms():
def detect_platforms() -> List[str]:
commands = ["gcloud", "aws"]
platforms = ["gcp", "aws"]
found_platforms = []
Expand All @@ -186,7 +200,7 @@ def detect_platforms():
return found_platforms


def prompt_choices(choices, prompt):
def prompt_choices(choices: List[_T0], prompt: str) -> _T0:
num = 0
n = len(choices)

Expand Down Expand Up @@ -216,13 +230,13 @@ def prompt_choices(choices, prompt):


class DockerImageBuilder:
def __init__(self, registry=""):
def __init__(self, registry: str = "") -> None:
self.registry = registry

def get_docker_registry_image_name(image_base_name):
def get_docker_registry_image_name(self, image_base_name: str) -> str:
return image_base_name

def build(self):
def build(self) -> str:
files = find_docker_files()
n = len(files)
if n == 0:
Expand All @@ -248,25 +262,26 @@ def build(self):

return self.full_image_name

def tag(self):
def tag(self) -> str:
self.full_image_name = self.image_name
return self.full_image_name

def push(self):
def push(self) -> None:
sp.check_call(
"docker push {}".format(self.full_image_name), shell=True,
)

def docker_tag(self, in_name, out_name):
def docker_tag(self, in_name: str, out_name: str) -> None:
sp.check_call("docker tag {} {}".format(in_name, out_name), shell=True)


class AWSImageBuilder(DockerImageBuilder):
def __init__(self, registry):
def __init__(self, registry: str) -> None:
self.registry = registry
parts = registry.split(".")
self.region = parts[-3]

def tag(self):
def tag(self) -> str:
image_name = self.image_name
full_image_name = "{}/{}".format(self.registry, self.image_name)

Expand All @@ -275,7 +290,7 @@ def tag(self):
self.full_image_name = full_image_name
return full_image_name

def need_new_repo(self):
def need_new_repo(self) -> bool:
output = sp.check_output(
"aws ecr describe-repositories --region {}".format(self.region),
shell=True,
Expand All @@ -293,7 +308,7 @@ def need_new_repo(self):

return True

def create_repo_if_needed(self):
def create_repo_if_needed(self) -> None:
if self.need_new_repo():
sp.check_call(
"aws ecr create-repository --region {} --repository-name {}".format(
Expand All @@ -304,7 +319,7 @@ def create_repo_if_needed(self):

return

def push(self):
def push(self) -> None:
self.create_repo_if_needed()

try:
Expand All @@ -320,10 +335,10 @@ def push(self):


class GCPImageBuilder(DockerImageBuilder):
def __init__(self, registry="gcr.io"):
def __init__(self, registry: str = "gcr.io") -> None:
self.registry = registry

def tag(self):
def tag(self) -> str:
image_name = self.image_name
proj = get_default_project_gcp()

Expand All @@ -343,7 +358,15 @@ def tag(self):
@click.option("--memory")
@click.option("-v", "--volume")
@click.argument("args", nargs=-1)
def run(attach, image, gpu, cpu, memory, volume, args):
def run(
attach: bool,
image: str,
gpu: int,
cpu: int,
memory: int,
volume: str,
args: List[str],
) -> int:
"""Run a command on a kubernetes cluster with fiber."""
platform = CONFIG["platform"]
print(
Expand All @@ -352,6 +375,8 @@ def run(attach, image, gpu, cpu, memory, volume, args):
)
)

builder: DockerImageBuilder

if image:
full_image_name = image
else:
Expand Down Expand Up @@ -393,6 +418,9 @@ def run(attach, image, gpu, cpu, memory, volume, args):
job_spec.volumes = volumes

job = k8s_backend.create_job(job_spec)
if job.data is None:
raise RuntimeError("Failed to create a new job")

pod_name = job.data.metadata.name
exitcode = 0

Expand All @@ -414,7 +442,7 @@ def run(attach, image, gpu, cpu, memory, volume, args):
return 0


def auto_select_platform():
def auto_select_platform() -> str:
platforms = detect_platforms()
if len(platforms) > 1:
choice = prompt_choices(
Expand All @@ -438,7 +466,7 @@ def auto_select_platform():
"--gcp", is_flag=True, help="Run commands on Google Cloud Platform"
)
@click.version_option(version=fiber.__version__, prog_name="fiber")
def main(docker_registry, aws, gcp):
def main(docker_registry: str, aws: bool, gcp: bool) -> None:
"""fiber command line tool that helps to manage workflow of distributed
fiber applications.
"""
Expand Down
Loading