This repository has been archived by the owner on Jan 14, 2023. It is now read-only.
forked from simple-machines/kafka-tunnel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Instance.py
40 lines (36 loc) · 1.45 KB
/
Instance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import boto3
class Instance:
def __init__(self, name, ip, port):
self.name = name
self.ip = ip
self.port = port
class RetrieveInstanceIPs():
def getIps(self,service,port):
raise NotImplementedError("Subclass must implement abstract method")
class ManualInstances(RetrieveInstanceIPs):
def getIps(self,service,ips,port):
instances=[]
for ip in ips.split(','):
instances.append(Instance(name=service,ip=ip,port=port))
return instances
class AWSInstances(RetrieveInstanceIPs):
def __init__(self,profile,region):
self.profile = profile
self.region = region
self.session = boto3.Session(profile_name=profile, region_name=region)
def getIps(self,service,port):
instances=[]
for ec2_ip in self.req_aws_ips(service, self.region):
instances.append(Instance(name=service,ip=ec2_ip,port=port))
return instances
def req_aws_ips(self,service, region):
ips=[]
aws_filter = lambda name,value: [{'Name':'tag:'+name,'Values':[value]}]
client = self.session.client('ec2')
response = client.describe_instances(Filters=aws_filter('Name',service))
for res in response.get('Reservations'):
for instance in res.get('Instances'):
ip = instance.get(u'PrivateIpAddress')
if ip is not None:
ips.append(instance.get(u'PrivateIpAddress'))
return ips