How to send data from a Python asyncio socket server to a subprocess?

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP

How to send data from a Python asyncio socket server to a subprocess?



Python 3.6



This program:



The problem is step 4. I can't work out how to send the received PNG image from the coroutine to the stdin of the ffmpeg subprocess. Can anyone please point me in the right direction to send the PNG image to the stdin of the ffmpeg subprocess?



EDIT: to clarify - there's nothing wrong with this code, it receives the PNGs fine over the socket. I just don't have any idea how to send the PNGs on into the stdin of ffmpeg. I've done quite alot of Python but asyncio is new to me and how things tie together is a mystery.



thanks!


import asyncio
import argparse, sys
import sys
import base64
from struct import unpack

parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
print("port is required")
sys.exit(1)
if not args.outputfilename:
print("outputfilename is required")
sys.exit(1)

async def _read_stream(stream, cb):
while True:
line = await stream.readline()
if line:
cb(line)
else:
break

async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)

await asyncio.wait([
_read_stream(process.stdout, stdout_cb),
_read_stream(process.stderr, stderr_cb)
])
return await process.wait()


def process_stderr(line):
# ffmpeg finishes processing and writes the output file when its input is closed
# thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
line = line.decode()
print(line)
if "Output" in line:
if args.outputfilename in line:
print('finished!!!!')
sys.exit(0)

def process_stdout(line):
print("STDOUT: %s" % line)

def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
input_type = "pipe:0" #stdin

params =
f"ffmpeg "
f"-loglevel 56 "
f"-y -framerate framerate "
f"-f image2pipe "
f"-i input_type "
f"-c:v libvpx-vp9 "
f"-b:v 1024k "
f"-q:v 0 "
f"-pix_fmt yuva420p "
f"outputdirectoryoutputfilename "

return params


async def socket_png_receiver(reader, writer):
while True:
# first the client sends the length of the data to us
lengthbuf = await reader.read(4)
length, = unpack('!I', lengthbuf)
if length == 0:
print("length was 0, finish") # a zero length PNG says that there are no more frames
break
# then we read the PNG
data = await reader.read(length)
data = data.decode() # from bytes to string
png_bytes = base64.b64decode(data) # from base64 to bytes
# next line was just a guess, so I have commented it out.
#await proc.communicate(png_bytes)
print("Got PNG, length", length)
return


loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
command.split(),
process_stdout,
process_stderr,
)
#coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, ffmpeg_process, loop=loop)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()



Here are the changes suggested by @user4815162342


import asyncio
import argparse, sys
import sys
import base64
from struct import unpack

parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
print("port is required")
sys.exit(1)
if not args.outputfilename:
print("outputfilename is required")
sys.exit(1)

async def _read_stream(stream, cb):
while True:
line = await stream.readline()
if line:
cb(line)
else:
break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
global process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)

await asyncio.wait([
_read_stream(process.stdout, stdout_cb),
_read_stream(process.stderr, stderr_cb)
])
return await process.wait()


def process_stderr(line):
# ffmpeg finishes processing and writes the output file when its input is closed
# thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
line = line.decode()
print(line)
if "Output" in line:
if args.outputfilename in line:
print('finished!!!!')
sys.exit(0)


def process_stdout(line):
print("STDOUT: %s" % line)


def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
input_type = "pipe:0" # stdin

params =
f"ffmpeg "
f"-loglevel 56 "
f"-y -framerate framerate "
f"-f image2pipe "
f"-i input_type "
f"-c:v libvpx-vp9 "
f"-b:v 1024k "
f"-q:v 0 "
f"-pix_fmt yuva420p "
f"outputdirectoryoutputfilename "

return params


async def socket_png_receiver(reader, writer):
while True:
# first the client sends the length of the data to us
lengthbuf = await reader.readexactly(4)
length, = unpack('!I', lengthbuf)
if length == 0:
print("length was 0, finish") # a zero length PNG says that there are no more frames
break
# then we read the PNG
data = await reader.readexactly(length)
png_bytes = base64.b64decode(data) # from base64 to bytes
process.stdin.write(png_bytes)
print("Got PNG, length", length)
return


loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
command.split(),
process_stdout,
process_stderr,
)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()





The question doesn't specify what exactly goes wrong with the provided implementation, but perhaps you should try to send the data using: proc.stdin.write(png_bytes) followed by proc.stdin.close()?
– user4815162342
1 hour ago



proc.stdin.write(png_bytes)


proc.stdin.close()





Also, await reader.read(length) should really be await reader.readexactly(length). The argument to StreamReader.read is the maximum, not exact number of bytes to read.
– user4815162342
1 hour ago


await reader.read(length)


await reader.readexactly(length)


StreamReader.read





Another suspect thing is that socket_png_receiver appears to expect that it would keep sending data to the same ffmpeg process. That means that ffmpeg must be able to process multiple chunks of data on its stdin, i.e. that it needs to have a delimiting protocol like you implement by packing the length before the data. I suspect that you should instead start a new ffmpeg process in each iteration of the loop in socket_png_receiver.
– user4815162342
1 hour ago


socket_png_receiver


socket_png_receiver





@user4815162342 I've updated the code - there was an error in the line starting coro, so I have corrected it and left the old line commented out. I have also commented out await proc.communicate(png_bytes) because it was nothing more than a voodoo guess as to how to achieve my goal and just caused an error. So, the code actually works fine, except there's just no mechanism for sending data to the ffmpeg stdin because I just can't figure out how at all.
– Duke Dougal
1 hour ago





@user4815162342 you are correct - socket_png_receiver will send ONLY to the one ffmpeg process. The idea is that once the sequence of PNG images is finished then both ffmpeg and this program that wraps it will exit.
– Duke Dougal
1 hour ago




1 Answer
1



There are several issues with the code:



await reader.read(length) should be await reader.readexactly(length) because the argument to StreamReader.read is the maximum number of bytes to read, and it can return fewer.


await reader.read(length)


await reader.readexactly(length)


StreamReader.read



proc.communicate(png_bytes) should be changed to proc.stdin.write(png_bytes). The call to communicate() is incorrect here because you want to continue talking to the program, while communicate() waits for all the streams to close.


proc.communicate(png_bytes)


proc.stdin.write(png_bytes)


communicate()


communicate()



The instance of process returned by asyncio.create_subprocess_exec(...) must be made available to socket_png_receiver, e.g. by making the process variable global using global process. (It would be better to use a class and assign to self.process, but that is beyond the scope of this answer.)


asyncio.create_subprocess_exec(...)


socket_png_receiver


process


global process


self.process



Some potential issues:



There is no need to decode data from bytes to string, base64.b64decode can accept bytes just fine.


data


base64.b64decode



spawn_ffmpeg() doesn't appear to use its listenport parameter.


spawn_ffmpeg()


listenport






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Comments

Popular posts from this blog

Executable numpy error

Trying to Print Gridster Items to PDF without overlapping contents

Mass disable jenkins jobs