Skip to content

API

Provides abstraction for the GCE provisioner

Required Environment Variables
  • GOOGLE_APPLICATION_SERVICE_ACCOUNT
  • GOOGLE_APPLICATION_CREDENTIALS
  • GOOGLE_PROJECT
  • GOOGLE_DATACENTER

options property

options: t.Mapping[str, str]

connect

connect() -> NodeDriver
Source code in ogc/provision.py
@retry(tries=5, logger=None)
def connect(self) -> NodeDriver:
    log.info("Establing provider connection...")
    log.debug(self.options)
    gce = get_driver(Provider.GCE)
    return gce(**self.options)

destroy

destroy(nodes: list[MachineModel]) -> bool
Source code in ogc/provision.py
def destroy(self, nodes: list[MachineModel]) -> bool:
    _nodes = self.provisioner.ex_destroy_multiple_nodes(
        node_list=[node.state() for node in nodes], destroy_boot_disk=True
    )  # type: ignore
    return all([node is True for node in _nodes])

setup

setup() -> None
Source code in ogc/provision.py
def setup(self) -> None:
    tags = self.layout.tags or []
    if self.layout.ports:
        self.create_firewall(self.layout.name, self.layout.ports, tags)

cleanup

cleanup(
    node: MachineModel, **kwargs: t.Mapping[str, t.Any]
) -> bool
Source code in ogc/provision.py
def cleanup(self, node: MachineModel, **kwargs: t.Mapping[str, t.Any]) -> bool:
    return True

image

image(runs_on: str) -> NodeImage
Source code in ogc/provision.py
def image(self, runs_on: str) -> NodeImage:
    # Pull from partial first
    try:
        partial_image: NodeImage = self.provisioner.ex_get_image_from_family(runs_on)  # type: ignore
        if partial_image:
            return partial_image
    except ResourceNotFoundError:
        log.debug(f"Could not find {runs_on}, falling back internal image map")
    _runs_on = CLOUD_IMAGE_MAP["google"]["amd64"].get(runs_on)
    try:
        return [i for i in self.images() if i.name == _runs_on][0]
    except IndexError:
        raise ProvisionException(f"Could not determine image for {_runs_on}")

create_firewall

create_firewall(
    name: str, ports: list[str], tags: list[str]
) -> None
Source code in ogc/provision.py
def create_firewall(self, name: str, ports: list[str], tags: list[str]) -> None:
    ports = [port.split(":")[0] for port in ports]
    try:
        self.provisioner.ex_get_firewall(name)  # type: ignore
    except ResourceNotFoundError:
        log.warning("No firewall found, will create one to attach nodes to.")
        self.provisioner.ex_create_firewall(  # type: ignore
            name, [{"IPProtocol": "tcp", "ports": ports}], target_tags=tags
        )

delete_firewall

delete_firewall(name: str) -> None
Source code in ogc/provision.py
def delete_firewall(self, name: str) -> None:
    try:
        self.provisioner.ex_destroy_firewall(self.provisioner.ex_get_firewall(name))  # type: ignore
    except ResourceNotFoundError:
        log.error(f"Unable to delete firewall {name}")

list_firewalls

list_firewalls() -> list[str]
Source code in ogc/provision.py
def list_firewalls(self) -> list[str]:
    return self.provisioner.ex_list_firewalls()  # type: ignore

create

create() -> list[MachineModel] | None
Source code in ogc/provision.py
def create(self) -> list[MachineModel] | None:
    image = self.image(self.layout.runs_on)
    if not image and not self.layout.username:
        raise ProvisionException(
            f"Could not locate AMI and/or username for: {self.layout.runs_on}"
        )
    size = self.sizes(self.layout.instance_size)[0]
    ex_metadata = {
        "items": [
            {
                "key": "ssh-keys",
                "value": "%s: %s"
                % (
                    self.layout.username,
                    Path(self.layout.ssh_public_key)
                    .expanduser()
                    .read_text()
                    .strip(),
                ),
            },
            {
                "key": "startup-script",
                "value": self._userdata()
                if "windows" not in self.layout.runs_on
                else "",
            },
        ]
    }

    if self.layout.ports:
        self.create_firewall(self.layout.name, self.layout.ports, self.layout.tags)

    now = datetime.datetime.utcnow().strftime("created-%Y-%m-%d")
    if self.layout.tags:
        self.layout.tags.append(now)
        self.layout.tags.append(f"user-{os.environ.get('USER', 'ogc')}")
        # Store some extra metadata similar to what other projects use
        self.layout.tags.append("environment-ogc")
        self.layout.tags.append("repo-ogc")

    suffix = str(uuid.uuid4())[:4]
    opts = dict(
        base_name=f"ogc-{self.layout.name}-{suffix}",
        image=image,
        size=size,
        number=self.layout.scale,
        ex_metadata=ex_metadata,
        ex_tags=self.layout.tags,
        ex_labels=self.layout.labels,
        ex_disk_type="pd-ssd",
        ex_disk_size=100,
        ex_preemptible=os.environ.get("OGC_DISABLE_SPOT", True),
    )
    _nodes = self.provisioner.ex_create_multiple_nodes(**opts)  # type: ignore
    _machines = []
    for node in _nodes:
        state_file_p = db.cache_path() / node.id
        state_file_p.write_bytes(db.model_as_pickle(node))
        machine = MachineModel(
            layout=self.layout,
            instance_name=node.name,
            instance_id=node.id,
            instance_state=node.state,
            public_ip=node.public_ips[0],
            private_ip=node.private_ips[0],
            remote_state_file=(db.cache_path() / node.id).resolve(),
        )
        machine.save()
        _machines.append(machine)
    return _machines

node

node(**kwargs: dict[str, object]) -> Node | None
Source code in ogc/provision.py
def node(self, **kwargs: dict[str, object]) -> Node | None:
    _nodes = self.provisioner.list_nodes()
    instance_id = None
    if "instance_id" in kwargs:
        instance_id = kwargs["instance_id"]
    _node = [n for n in _nodes if n.id == instance_id]
    return _node[0] if len(_node) > 0 else None

__str__

__str__() -> str
Source code in ogc/provision.py
def __str__(self) -> str:
    return f"<GCEProvisioner [{self.options['datacenter']}]>"

AWS Provisioner

Required Environment Variables
  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
Optional Environment Variables
  • AWS_REGION

options property

options: t.Mapping[str, str]

connect

connect() -> NodeDriver
Source code in ogc/provision.py
@retry(delay=5, tries=10, jitter=(5, 25), logger=None)
def connect(self) -> NodeDriver:
    aws = get_driver(Provider.EC2)
    return aws(**self.options)

setup

setup() -> None
Source code in ogc/provision.py
def setup(self) -> None:
    if self.layout.ports:
        self.create_firewall(self.layout.name, self.layout.ports)
    if not any(kp.name == self.layout.name for kp in self.list_key_pairs()):
        self.create_keypair(self.layout.name, str(self.layout.ssh_public_key))

cleanup

cleanup(node: Node, **kwargs: dict[str, object]) -> bool
Source code in ogc/provision.py
def cleanup(self, node: Node, **kwargs: dict[str, object]) -> bool:
    pass

image

image(runs_on: str) -> NodeImage
Source code in ogc/provision.py
def image(self, runs_on: str) -> NodeImage:
    if runs_on.startswith("ami-"):
        _runs_on: str = runs_on
    else:
        # FIXME: need proper architecture detection
        _runs_on = CLOUD_IMAGE_MAP["aws"]["amd64"].get(runs_on, "")
    return super().image(_runs_on)

create_firewall

create_firewall(name: str, ports: list[str]) -> None

Creates the security group for enabling traffic between nodes

Source code in ogc/provision.py
def create_firewall(self, name: str, ports: list[str]) -> None:
    """Creates the security group for enabling traffic between nodes"""
    if not any(sg.name == name for sg in self.provisioner.ex_get_security_groups()):  # type: ignore
        self.provisioner.ex_create_security_group(name, "ogc sg", vpc_id=None)  # type: ignore

    for port in ports:
        ingress, egress = port.split(":")
        self.provisioner.ex_authorize_security_group(  # type: ignore
            name, ingress, egress, "0.0.0.0/0", "tcp"
        )

delete_firewall

delete_firewall(name: str) -> None
Source code in ogc/provision.py
def delete_firewall(self, name: str) -> None:
    pass

create

create() -> list[MachineModel] | None
Source code in ogc/provision.py
def create(self) -> list[MachineModel] | None:
    pub_key = Path(self.layout.ssh_public_key).expanduser().read_text()
    auth = NodeAuthSSHKey(pub_key)
    image = self.image(self.layout.runs_on)
    if not image and not self.layout.username:
        raise ProvisionException(
            f"Could not locate AMI and/or username for: {self.layout.runs_on}"
        )

    size = self.sizes(self.layout.instance_size)[0]

    opts = dict(
        name=f"{str(uuid.uuid4())[:8]}-{self.layout.name}",
        image=image,
        size=size,
        auth=auth,
        ex_securitygroup=self.layout.name,
        ex_spot=True,
        ex_maxcount=self.layout.scale,
        ex_userdata=self._userdata()
        if "windows" not in self.layout.runs_on
        else "",
        ex_terminate_on_shutdown=True,
    )
    tags = {}

    # Store some metadata for helping with cleanup
    now = datetime.datetime.utcnow().strftime("%Y-%m-%d")
    if self.layout.tags:
        self.layout.tags.append(now)
        self.layout.tags.append(f"user-{os.environ.get('USER', 'ogc')}")
        tags["created"] = now
        tags["user_tag"] = f"user-{os.environ.get('USER', 'ogc')}"
        # Store some extra metadata similar to what other projects use
        epoch = str(datetime.datetime.now().timestamp())
        tags["created_date"] = epoch
        tags["environment"] = "ogc"
        tags["repo"] = "ogc"

    node = self.provisioner.create_node(**opts)  # type: ignore
    _machines = []
    state_file_p = db.cache_path() / node.id
    state_file_p.write_bytes(db.model_as_pickle(node))
    machine = MachineModel(
        layout=self.layout,
        instance_name=node.name,
        instance_id=node.id,
        instance_state=node.state,
        public_ip=node.public_ips[0],
        private_ip=node.private_ips[0],
        remote_state_file=(db.cache_path() / node.id).resolve(),
    )
    machine.save()
    _machines.append(machine)
    return _machines if _machines else None

node

node(**kwargs: dict[str, object]) -> Node
Source code in ogc/provision.py
def node(self, **kwargs: dict[str, object]) -> Node:
    instance_id = kwargs.get("instance_id", None)
    _nodes = self.provisioner.list_nodes(ex_node_ids=[instance_id])
    if _nodes:
        return _nodes[0]
    raise ProvisionException("Unable to get node information")

__str__

__str__() -> str
Source code in ogc/provision.py
def __str__(self) -> str:
    return f"<AWSProvisioner [{self.options['region']}]>"