#include “AwsClient.h”
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <oxf/Util/onceToken.h>
#include
#include “Logger.h”
#include “common/Config.h”
namespace iCG {
AwsClient::AwsClient() {
static oxf::onceToken token(= { Aws::InitAPI(_options); },
= { Aws::ShutdownAPI(_options); });
}
AwsClient::~AwsClient() {}
void AwsClient::Init(std::string &addr, const std::string &name,
const std::string &pwd) {
_addr = addr;
// _options.loggingOptions.logLevel =
// Aws::Utils::Logging::LogLevel::Debug;
Aws::Client::ClientConfiguration cfg;
cfg.endpointOverride = addr.c_str();
cfg.scheme = Aws::Http::Scheme::HTTP;
cfg.verifySSL = false;
cfg.connectTimeoutMs = Config::Instance()._minioInfo.GetConnectTimeout();
cfg.requestTimeoutMs = Config::Instance()._minioInfo.GetWriteTimeout();
cfg.httpRequestTimeoutMs = Config::Instance()._minioInfo.GetWriteTimeout();
std::shared_ptrAws::Client::RetryStrategy retry;
retry.reset(new Aws::Client::DefaultRetryStrategy(
Config::Instance()._minioInfo.GetRetryCount()));
cfg.retryStrategy = retry; // assign to client_config
Aws::Auth::AWSCredentials cred(name.c_str(), pwd.c_str());
_client = std::make_sharedAws::S3::S3Client(
cred, cfg, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always,
false);
}
void AwsClient::pcmAddWavHeader(std::string *headStr, int channels, int bits,
int sample_rate, int len) {
head_data_t pcm2wavHEAD;
memcpy(pcm2wavHEAD.riffType, “RIFF”, strlen(“RIFF”));
memcpy(pcm2wavHEAD.wavType, “WAVE”, strlen(“WAVE”));
pcm2wavHEAD.riffSize = 36 + len;
pcm2wavHEAD.sampleRate = sample_rate;
pcm2wavHEAD.bitsPerSample = bits;
memcpy(pcm2wavHEAD.formatType, "fmt ", strlen("fmt "));
pcm2wavHEAD.formatSize = 16;
pcm2wavHEAD.numChannels = channels;
pcm2wavHEAD.blockAlign = channels * bits / 8;
pcm2wavHEAD.compressionCode = 1;
pcm2wavHEAD.bytesPerSecond = pcm2wavHEAD.sampleRate * pcm2wavHEAD.blockAlign;
memcpy(pcm2wavHEAD.dataType, “data”, strlen(“data”));
pcm2wavHEAD.dataSize = len;
(*headStr).append((char *)&pcm2wavHEAD, 44);
// fseek(fp, 0, SEEK_SET);
// fwrite(&pcm2wavHEAD, 44, 1, fp);
}
bool AwsClient::Upload(const std::string &key, const std::string &content,
std::string bucket, bool audioFlag) {
//上传到minio
bool ret = true;
// wav头
std::string uploadStr = “”;
if (audioFlag) {
pcmAddWavHeader(&uploadStr, 1, 16, 8000, content.size());
uploadStr.append(content.data(), content.size());
} else {
uploadStr = content;
}
//上传到minio
Aws::S3::Model::PutObjectRequest putObjectRequest;
putObjectRequest.WithBucket(bucket.c_str()).WithKey(key.c_str());
auto contents_stream =
Aws::MakeSharedAws::StringStream(“PutObjectInputStream”);
contents_stream->write((char *)uploadStr.data(), uploadStr.size());
putObjectRequest.SetBody(contents_stream);
//同步发送
#if 1
auto putObjectResult = _client->PutObject(putObjectRequest);
if (not putObjectResult.IsSuccess()) {
S_LOG_ERROR(_addr << “, Upload " << key << " failed!!!”
<< putObjectResult.GetError().GetMessage().c_str()
<< (int)putObjectResult.GetError().GetErrorType());
ret = _health = false;
} else {
S_LOG_INFO(_addr << ", Upload " << key << " success!!! " << _count);
}
_count–;
return ret;
#endif
//异步发送
#if 0
std::shared_ptrAws::Client::AsyncCallerContext context =
Aws::MakeSharedAws::Client::AsyncCallerContext(
“PutObjectAllocationTag”);
context->SetUUID(key.c_str());strongClient->_client->PutObjectAsync(putObjectRequest,[](const Aws::S3::S3Client *s3Client,const Aws::S3::Model::PutObjectRequest &request,const Aws::S3::Model::PutObjectOutcome &outcome,const std::shared_ptr<const Aws::Client::AsyncCallerContext>&context) {if (outcome.IsSuccess()) {S_LOG_INFO("Success: PutObjectAsyncFinished: Finished uploading '"<< context->GetUUID());} else {S_LOG_ERROR("Error: PutObjectAsyncFinished: "<< outcome.GetError().GetMessage());}},context);
#endif
}
bool AwsClient::Download(const std::string &key, std::string &fileStr) {
bool ret = true;
Aws::S3::Model::GetObjectRequest object_request;
object_request.WithBucket(“”).WithKey(key.c_str());
auto get_object_outcome = _client->GetObject(object_request);
if (get_object_outcome.IsSuccess()) {
std::ostringstream tmp;
tmp << get_object_outcome.GetResult().GetBody().rdbuf();
fileStr = tmp.str();
S_LOG_INFO(_addr << ", Download " << key << " success!!! " << _count);
} else {
S_LOG_ERROR(_addr << “, Download " << key << " failed!!!”
<< get_object_outcome.GetError().GetMessage().c_str()
<< (int)get_object_outcome.GetError().GetErrorType());
ret = _downloadHealth = false;
}
_count–;
return ret;
}
bool AwsClient::checkUnHealthClient() {
if (!_health || !_downloadHealth) {
auto putObjectResult = _client->ListBuckets();
if (not putObjectResult.IsSuccess()) {
return false;
}
_health = _downloadHealth = true;
return _health;
} else {
return true;
}
}
} // namespace iCG