使用Kinesis Video Stream接入视频类设备业务
目前有需求,当设备被某个事件触发时,设备会上传事件先后时间的视频到Kinesis Video Stream(后面简称kvs)上,在kvs上需要按时间触发的前后10秒对视频进行截取,并且拿到事件发生的截图作为视频的封面,最后将视频和截图一起上传到S3
-
接收设备事件,通过aws iot的消息路由到指定Lambda
-
在Lambda中,通过消息的事件戳确定视频起始和结束时间,然后使用GetClip api来获取到指定时间内的视频
-
在Lambda中,使用
ffmpeg-platform
获取到事件触发时的截图,最后将视和截图频上传到S3
上面步骤需要在kvs提前创建好流,并且设备已经将视频上传
具体实现
导入jar包
software.amazon.awssdk kinesisvideoarchivedmedia 2.20.48 software.amazon.awssdk kinesisvideo 2.20.48 org.bytedeco javacv 1.5.6 org.bytedeco ffmpeg-platform 4.4-1.5.6代码实现
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.kinesisvideo.KinesisVideoClient;
import software.amazon.awssdk.services.kinesisvideo.model.APIName;
import software.amazon.awssdk.services.kinesisvideo.model.GetDataEndpointRequest;
import software.amazon.awssdk.services.kinesisvideo.model.GetDataEndpointResponse;
import software.amazon.awssdk.services.kinesisvideoarchivedmedia.KinesisVideoArchivedMediaAsyncClient;
import software.amazon.awssdk.services.kinesisvideoarchivedmedia.model.*;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
/**
- @author: caoyu
- @CreateTime: 2023-05-20
- @Description: TODO
/
public class AwsKvsUtils {
private static final Logger logger = LogManager.getLogger(AwsKvsUtils.class);
private static KinesisVideoArchivedMediaAsyncClient kvsClient;
private static S3Utils s3Utils;
public static AwsKvsUtils build(AwsCredentialsProvider awsCredentialsProvider, URI endPoint, Region region) {
KinesisVideoArchivedMediaAsyncClient client = KinesisVideoArchivedMediaAsyncClient
.builder()
.credentialsProvider(awsCredentialsProvider)
.region(region)
.endpointOverride(endPoint)
.build();
S3Utils s3Utils = S3Utils.build(awsCredentialsProvider, region);
return new AwsKvsUtils(client,s3Utils);
}
public void getClip(String streamName, Long time, String bucketName){
//从kvs获取视频片段
ClipFragmentSelector clipFragmentSelector = ClipFragmentSelector
.builder()
.fragmentSelectorType(ClipFragmentSelectorType.SERVER_TIMESTAMP)
.timestampRange(ClipTimestampRange.builder().startTimestamp(Instant.ofEpochMilli(time-101000)).endTimestamp(Instant.ofEpochMilli(time+10*1000)).build()).build();
GetClipRequest request = GetClipRequest.builder().streamName(streamName).clipFragmentSelector(clipFragmentSelector).build();
File tempFile;
Path path = null;
try {
tempFile = File.createTempFile(“kvs-”, “.mkv”);
path = tempFile.toPath();
} catch (IOException e) {
logger.error(“创建临时文件失败–>{}”, e.getMessage());
}
if (path == null){
logger.error(“获取临时文件目录失败”);
return;
}
CompletableFuture result = kvsClient.getClip(request, path);
GetClipResponse getClipResponse;
try {
getClipResponse = result.get();
} catch (Exception e) {
logger.error(“GetClip error”, e);
return;
}
if (!getClipResponse.sdkHttpResponse().isSuccessful()){
logger.error(“GetClip error, response code: {}, requestId:{}”, getClipResponse.sdkHttpResponse().statusCode(),getClipResponse.responseMetadata().requestId());
}
//上传视频到s3
s3Utils.uploadObject(bucketName, “test.mkv”, path.toString());
//提取视频帧
InputStream inputStream = ExtractFrame.extractFrame(path.toString());
//上传视频帧到s3
s3Utils.uploadObject(bucketName, “test.png”, inputStream);
}
private AwsKvsUtils() {
}
private AwsKvsUtils(KinesisVideoArchivedMediaAsyncClient kvsClient, S3Utils s3Utils) {
AwsKvsUtils.kvsClient = kvsClient;
AwsKvsUtils.s3Utils = s3Utils;
}
public static void setKvsClient(KinesisVideoArchivedMediaAsyncClient kvsClient) {
AwsKvsUtils.kvsClient = kvsClient;
}
public static S3Utils getS3Utils() {
return s3Utils;
}
public static void setS3Utils(S3Utils s3Utils) {
AwsKvsUtils.s3Utils = s3Utils;
}
public static void main(String[] args) throws URISyntaxException {
//获取到凭证对象
String streamName = “xxx”;
String accessKey = “xxxx”;
String secretKey = “xxxxx”;
StaticCredentialsProvider staticCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
//获取kvs访问端点
KinesisVideoClient kinesisVideoClient = KinesisVideoClient.builder().credentialsProvider(staticCredentialsProvider).region(Region.US_EAST_1).build();
GetDataEndpointResponse dataEndpoint = kinesisVideoClient.getDataEndpoint(GetDataEndpointRequest.builder().apiName(APIName.GET_CLIP).streamName(streamName).build());
//构建kvs工具类
AwsKvsUtils awsKvsUtils = AwsKvsUtils.build(staticCredentialsProvider, new URI(dataEndpoint.dataEndpoint()), Region.US_EAST_1);
//1684569276774L是事件发生时间戳,kvs-video-storage是s3上的桶名
awsKvsUtils.getClip(streamName, 1684569276774L, “kvs-video-storage”);
}
}
维护好设备和视频url、截图url的关系,用户可以通过s3来查看以往的视频,通过kvs查看直播
注意:ffmpeg-platform
包比较大,生产时最好把ffmpeg-platform
包进行分层
测试效果
可以在Aws的iot core进行模拟设备上报,也可以在Lambda详情页面配置好事件参数进行测试
这里演示Iot模拟设备上报进行测试]
1.提前上传好视频,拿到对应的时间戳,上传视频参考:使用Kinesis Video Streams上传视频 (hanjiang.work)
2.配置消息路由的规则
3.使用页面的MQTT 测试客户端模拟设备上报
主题:TEST_MAC/TEST_MAC/user/event/trigger
负载:{"time":1685611621116,"data":{"triggerTime":1685611621116,"type":1}}
4.查看s3上是否出现了视频和截图,出现说明执行成功
评论