Sep 26-28, 2018 Vikrant Patil
These notes are available online at http://notes.pipal.in/2018/vmware-practical-master-sep/
© Pipal Academy LLP
We will be using anaconda (python 3) distribution for this training. it can be downloaded from
%%file wc.py
import sys
def linecount(filename):
with open(filename) as f:
return len(f.readlines())
def wordcount(filename):
with open(filename) as f:
return len(f.read().split())
def charcount(filename):
with open(filename) as f:
return len(f.read())
if __name__ == "__main__":
file = sys.argv[1]
print(linecount(file), wordcount(file), charcount(file), file)
!python wc.py day1.html
sys.argv is simplest way of taking arguments from commandline. But you will have to mange your arguments on your own. it will be very difficult to manage script with complicated arguments.
We will use argparse module to do this for us.
%%file grep.py
import argparse
def parse_args():
parser = argparse.ArgumentParser("grep")
### this is the way to add positional arguments
parser.add_argument("pattern", help="Pattern to look for")
parser.add_argument("filename", help="Look for pattern in this file")
return parser.parse_args()
def grep(pattern, file):
with open(file) as f:
for line in f:
if pattern in line:
print(line,end="")
if __name__ == "__main__":
args = parse_args()
print(args)
grep(args.pattern, args.filename)
!python grep.py --help
!python grep.py "def" grep.py
%%file grep1.py
import argparse
def parse_args():
parser = argparse.ArgumentParser("grep")
### this is the way to add positional arguments
parser.add_argument("pattern", help="Pattern to look for")
parser.add_argument("filename", help="Look for pattern in this file")
#this is how you add optional argument
parser.add_argument("-m", "--maxcount",
type=int,
help="How many lines to match. command will stop after printing maxcount matches")
return parser.parse_args()
def grepmax(pattern, file, maxcount):
lines = grep(pattern, file)
return [next(lines) for i in range(maxcount)]
def grep(pattern, file):
with open(file) as f:
for line in f:
if pattern in line:
yield line
def printlines(lines):
for line in lines:
print(line, end="")
if __name__ == "__main__":
args = parse_args()
print(args)
if args.maxcount:
printlines(grepmax(args.pattern, args.filename, args.maxcount))
else:
printlines(grep(args.pattern, args.filename))
!python grep1.py --help
!python grep1.py "div" day1.html
!python grep1.py -m 5 "div" day1.html
!python grep1.py --maxcount 5 "div" day1.html
from collections import deque
dq = deque(maxlen=5)
dq.append(1)
dq.append(2)
dq.append(3)
dq.append(4)
dq.append(5)
dq
dq.append(6)
dq
%%file grep2.py
import argparse
from collections import deque
def parse_args():
parser = argparse.ArgumentParser("grep")
### this is the way to add positional arguments
parser.add_argument("pattern", help="Pattern to look for")
parser.add_argument("filename", help="Look for pattern in this file")
#this is how you add optional argument
parser.add_argument("-m", "--maxcount",
type=int,
help="How many lines to match. command will stop after printing maxcount matches")
parser.add_argument("-c", "--context",
type=int, default=1,
help="Show context of these many lines for match")
#this how you add flag, when flag is present it is taken as true
parser.add_argument("-v", "--invert",action="store_true",
help="Invert the match")
return parser.parse_args()
def take(seq, n):
return [next(seq) for i in range(n)]
def grep(pattern, file, context):
q = deque(maxlen=context)
with open(file) as f:
for line in f:
q.append(line)
if pattern in line:
for l in q:
yield l
q.clear()
def printlines(lines):
for line in lines:
print(line, end="")
if __name__ == "__main__":
args = parse_args()
print(args)
if args.maxcount:
greped = grep(args.pattern, args.filename, args.context)
printlines(take(greped, args.maxcount*args.context))
else:
printlines(grep(args.pattern, args.filename, args.context))
def f(n):
yield from range(n)
list(f(4))
!python grep2.py --help
!python grep2.py -m2 -c3 "def" grep2.py
parser = argparse.ArgumentParser(prog='PROG')
group = parser.add_mutually_exclusive_group()
group.add_argument('--foo', action='store_true')
group.add_argument('--bar', action='store_false')
parser.parse_args(['--foo'])
parser.parse_args(["--foo", "--bar"])
parser.parse_args([ "--bar"])
%%file hello.py
import click
@click.command()
@click.argument("name")
def hello(name):
print("hello", name, "!")
if __name__ == "__main__":
hello()
!python hello.py
!python hello.py --help
%%file hello.py
import click
@click.command()
@click.argument("name")
@click.argument("greeting")
def hello(name, greeting):
print("hello", name, greeting, "!")
if __name__ == "__main__":
hello()
!python hello.py --help
%%file hello.py
#!/home/vikrant/usr/local/anaconda3/bin/python3
import click
@click.command()
@click.option("--upper", is_flag=True, help="Print in upper case")
@click.argument("name")
@click.argument("greeting")
def hello(name, greeting, upper):
s = " ".join(["hello", name, greeting, "!"])
if upper:
print(s.upper())
else:
print(s)
if __name__ == "__main__":
hello()
!python hello.py --upper vmware morning
%%file hello.py
import click
import sys
@click.command()
@click.option("--upper", is_flag=True, help="Print in upper case")
@click.argument("name")
@click.argument("greeting")
def hello(name, greeting, upper):
s = " ".join(["hello", name, greeting, "!"])
if upper:
print(s.upper())
else:
print(s)
@click.command()
@click.argument("name1")
def echo(name1):
print("echo", name1)
if __name__ == "__main__":
if sys.argv[1]=="hello":
hello()
elif sys.argv[1]=="echo":
echo()
elif sys.argv[1]=="--help":
print("subcomands are:\nhello\necho")
!python hello.py --help
!python hello.py echo --help
!python hello.py hello --help
!python hello.py hello vmware morning
import os
Directory as module!
os.mkdir("package")
os.makedirs("package/foo/nested")
!touch package/foo/__init__.py
!touch package/foo/nested/__init__.py
!tree package/
os.getcwd()
os.chdir("package/")
os.getcwd()
import foo
from foo import nested
!cat >> foo/hello.py
print("hello.py")
!cat foo/hello.py
from foo import hello
hello.add(2,3)
%%file setup.py
from distutils.core import setup
setup(
name="foo",
version='1.0',
description="Fool library for packaging demo!",
author="Vikrant",
email="vikrant@foo.net",
url="www.foo.net",
packages=["foo", "foo.nested"],
)
!tree
os.mkdir("bar")
!python setup.py sdist #source distribution
!tree
os.chdir("dist")
!ls
import shutil
shutil.unpack_archive("foo-1.0.tar.gz")
!lsal
!python3.5 -m venv fooenv
!ls
$ source fooenv/bin/activate
(fooenv)$ cd foo-1.0
(fooenv)$ python setup.py install
this will install your foo package in "fooenv" virtual environment. then you can start python interpreter and can import foo module
!pwd
os.chdir("/home/vikrant/trainings/2018/vmware-practical-master-sep")
os.mkdir("executable_folder")
os.chdir("executable_folder/")
%%file __main__.py
print("Hello from executable folder")
%%file __init__.py
pass
os.chdir("/home/vikrant/trainings/2018/vmware-practical-master-sep")
!python executable_folder/
shutil.make_archive(format="zip", base_name="executable_folder", base_dir="executable_folder/")
!python executable_folder.zip
pip freeze >requirements.txt will collect all the dependencies's names in requirements.txt
pip install -r requirements.txt this command installs all dependecies listed in requirements.txt
import subprocess
p = subprocess.run(["ls","-l"])
p.args
results.stdout
results.stderr
r = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE)
r.stdout
r.stdout.decode("utf-8")
r = subprocess.run(["cat", "xyz.txt"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
r.stdout
r.stderr
subprocess.check_output(["ls","-l"])
subprocess.run(["ls", "-l"], stdout=subprocess.PIPE).stdout
import sys
sys.stdout.write("hello world")
print("hello")
sys.stderr.write("Error!")
%%file echo.py
import sys
if __name__ == "__main__":
for line in sys.stdin:
sys.stderr.write("Debug: got line" + line)
sys.stdout.write(line)
p = subprocess.Popen(["python", "echo.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
p
lines = b"""one
two
three
four
five
"""
stdout, stderr = p.communicate(lines)
print(stdout.decode("utf-8"))
print(stderr.decode("utf-8"))
problem
import threading
import time
def tick(n):
for i in range(n):
time.sleep(2)
print("Tick-", i)
def rtick(n):
for i in reversed(range(n)):
time.sleep(1)
print("Reversed Tick-", i)
t = threading.Thread(target=tick, args=(10,))
t.start()
t = threading.Thread(target=tick, args=(10,))
t1 = threading.Thread(target=rtick, args=(10,))
t.start()
t1.start()
t.join()
print("Control back to main thread!")
(1,2)
(1)
(1,)
%%file wthreads.py
import sys
import threading
import requests
def download(url):
r = requests.get(url)
def tasks(concurrency):
urls = ["http://httpbin.org/get" for i in range(10)]
while urls:
tcount = concurrency if len(urls) >= concurrency else len(urls)
print("Starting {} threads".format(tcount))
threads = [threading.Thread(target=download, args=(urls.pop(),) )for i in range(tcount)]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
tasks(int(sys.argv[1]))
!time -p python wthreads.py 1
!time -p python wthreads.py 2
l.pop()
%%file cthreads.py
import sys
import threading
import requests
def saxpy(url):
s = 0.0
for i in range(10000):
for j in range(1000):
s += i*j*1.0
def tasks(concurrency):
urls = ["http://httpbin.org/get" for i in range(10)]
while urls:
tcount = concurrency if len(urls) >= concurrency else len(urls)
print("Starting {} threads".format(tcount))
threads = [threading.Thread(target=saxpy, args=(urls.pop(),) )for i in range(tcount)]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
tasks(int(sys.argv[1]))
!time -p python cthreads.py 2
!time -p python cthreads.py 1
%%file tpool.py
import sys
import requests
from multiprocessing.pool import ThreadPool
def task(url):
r = requests.get(url)
def threadpool(concurrency):
pool = ThreadPool(concurrency)
urls = ["http://httpbin.org/get" for i in range(10)]
r = pool.map(task, urls)
if __name__ == "__main__":
threadpool(int(sys.argv[1]))
!time -p python tpool.py 1
!time -p python tpool.py 2
!time -p python tpool.py 4
%%file processpool.py
import sys
import requests
from multiprocessing.pool import Pool
def saxpy(url):
s = 0.0
for i in range(10000):
for j in range(1000):
s += i*j*1.0
return s
def processpool(concurrency):
pool = Pool(concurrency)
urls = ["http://httpbin.org/get" for i in range(10)]
#r = pool.map(saxpy, urls)
#r = pool.imap(saxpy, urls) #
#print(next(r))
r = pool.apply_async(saxpy, (urls.pop,))
print(r.get(timeout=5))
if __name__ == "__main__":
processpool(int(sys.argv[1]))
!time -p python processpool.py 1
!time -p python processpool.py 2
import threading
import time
class Clock(threading.Thread):
def __init__(self , *args, **kwargs):
super().__init__(*args, **kwargs)
self.event = threading.Event()
self.count = 0
self.lock = threading.Lock()
def run(self):
while not self.event.isSet():
print("Tick-", self.count)
self.lock.acquire()
self.count += 1
self.lock.release()
time.sleep(2)
def stop(self):
self.event.set()
def reset(self):
self.lock.acquire()
self.count = 0
self.lock.release()
c = Clock()
c.start()
c.reset()
c.stop()
import requests
r = requests.get("http://httpbin.org/get")
print(r.text)
r.status_code
r = requests.get("http://httpbin.org/html")
r.status_code
print(r.text[:100])
r = requests.get("http://httpbin.org/get", params={"query":"xyz","abc":"hello","language":"python"})
print(r.text)
rr = requests.post("http://httpbin.org/post", data="plain text")
print(rr.text)
r = requests.post("http://httpbin.org/post", data={"name":"python","email":"py@xyz.com"})
print(r.text)
url = "https://api.github.com/orgs/vmware/repos"
repos = requests.get("https://api.github.com/orgs/vmware/repos").json()
type(repos)
repos[0]
for repo in repos:
print(repo['full_name'], repo['forks'])
def get_forks(r):
return r['forks']
for repo in sorted(repos, key=get_forks):
print(repo['full_name'], repo['forks'])
for repo in sorted(repos, key=get_forks, reverse=True):
print(repo['full_name'], repo['forks'])
for repo in sorted(repos, key=get_forks, reverse=True)[:5]:
print(repo['full_name'], repo['forks'])
from fabric2 import Connection
c = Connection("localhost")
r = c.run("ls")
r.return_code
from fabric2 import SerialGroup
g = SerialGroup("localhost", "localhost", "localhost")
r = g.run("ls")
type(r)
help(Connection)
from invoke import Responder
references